From b6c5ffed511801d961540f310d8ac25a9f26f1fc Mon Sep 17 00:00:00 2001 From: "andrii.leonets" Date: Mon, 17 Jan 2022 15:09:07 +0200 Subject: [PATCH 1/4] Interface reorganization --- .../uploader/AbstractBigQueryUploader.java | 4 ++-- .../uploader/AbstractGscBigQueryUploader.java | 4 ++-- .../bigquery/writer/BigQueryTableWriter.java | 13 ++++++++++--- .../destination/gcs/GcsConsumer.java | 8 ++++---- .../destination/gcs/avro/GcsAvroWriter.java | 6 ++---- .../destination/gcs/csv/GcsCsvWriter.java | 6 ++---- .../destination/gcs/jsonl/GcsJsonlWriter.java | 6 ++---- .../gcs/parquet/GcsParquetWriter.java | 6 ++---- .../destination/gcs/writer/BaseGcsWriter.java | 4 ++-- .../destination/gcs/writer/CommonWriter.java | 18 ------------------ .../gcs/writer/GcsWriterFactory.java | 4 ++-- .../gcs/writer/ProductionWriterFactory.java | 4 ++-- .../jdbc/copy/s3/S3StreamCopier.java | 10 +++++----- .../destination/s3/S3Consumer.java | 8 ++++---- .../destination/s3/avro/S3AvroWriter.java | 4 ++-- .../destination/s3/csv/S3CsvWriter.java | 4 ++-- .../destination/s3/jsonl/S3JsonlWriter.java | 4 ++-- .../s3/parquet/S3ParquetWriter.java | 4 ++-- .../destination/s3/writer/BaseS3Writer.java | 2 +- .../s3/writer/DestinationFileWriter.java} | 4 ++-- .../{S3Writer.java => DestinationWriter.java} | 14 +++++--------- .../s3/writer/ProductionWriterFactory.java | 2 +- .../destination/s3/writer/S3WriterFactory.java | 4 ++-- .../snowflake/SnowflakeDestinationTest.java | 2 +- 24 files changed, 61 insertions(+), 84 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/CommonWriter.java rename airbyte-integrations/connectors/{destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GscWriter.java => destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/DestinationFileWriter.java} (63%) rename airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/{S3Writer.java => DestinationWriter.java} (60%) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java index cad581f262a42..ab4600b0825a2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractBigQueryUploader.java @@ -19,7 +19,7 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.bigquery.BigQueryUtils; import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; -import io.airbyte.integrations.destination.gcs.writer.CommonWriter; +import io.airbyte.integrations.destination.s3.writer.DestinationWriter; import io.airbyte.protocol.models.AirbyteMessage; import java.io.IOException; import java.util.function.Consumer; @@ -28,7 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractBigQueryUploader { +public abstract class AbstractBigQueryUploader { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryUploader.class); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java index 0c2f1d03e8f49..78e03ff6ef803 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/uploader/AbstractGscBigQueryUploader.java @@ -18,14 +18,14 @@ import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.GcsS3Helper; -import io.airbyte.integrations.destination.gcs.writer.GscWriter; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteMessage; import java.util.List; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractGscBigQueryUploader extends AbstractBigQueryUploader { +public abstract class AbstractGscBigQueryUploader extends AbstractBigQueryUploader { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGscBigQueryUploader.class); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java index 018cde5c8063a..cbf6b2bfdfc8c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/writer/BigQueryTableWriter.java @@ -8,13 +8,15 @@ import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.common.base.Charsets; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.destination.gcs.writer.CommonWriter; +import io.airbyte.integrations.destination.s3.writer.DestinationWriter; +import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQueryTableWriter implements CommonWriter { +public class BigQueryTableWriter implements DestinationWriter { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class); @@ -27,13 +29,18 @@ public BigQueryTableWriter(TableDataWriteChannel writeChannel) { @Override public void initialize() throws IOException {} + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage) { + throw new RuntimeException("This write method is not used!"); + } + @Override public void write(JsonNode formattedData) throws IOException { writeChannel.write(ByteBuffer.wrap((Jsons.serialize(formattedData) + "\n").getBytes(Charsets.UTF_8))); } @Override - public void close(boolean hasFailed) throws Exception { + public void close(boolean hasFailed) throws IOException { this.writeChannel.close(); } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java index 964f8e0adb945..80604e940218f 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java @@ -9,7 +9,7 @@ import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -28,7 +28,7 @@ public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer { private final ConfiguredAirbyteCatalog configuredCatalog; private final GcsWriterFactory writerFactory; private final Consumer outputRecordCollector; - private final Map streamNameAndNamespaceToWriters; + private final Map streamNameAndNamespaceToWriters; private AirbyteMessage lastStateMessage = null; @@ -50,7 +50,7 @@ protected void startTracked() throws Exception { final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { - final S3Writer writer = writerFactory + final DestinationFileWriter writer = writerFactory .create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp); writer.initialize(); @@ -87,7 +87,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti @Override protected void close(final boolean hasFailed) throws Exception { - for (final S3Writer handler : streamNameAndNamespaceToWriters.values()) { + for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) { handler.close(hasFailed); } // Gcs stream uploader is all or nothing if a failure happens in the destination. diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java index 775a2e64af415..90b2ad4c514d2 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java @@ -11,14 +11,12 @@ import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.util.GcsUtils; import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; -import io.airbyte.integrations.destination.gcs.writer.CommonWriter; -import io.airbyte.integrations.destination.gcs.writer.GscWriter; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory; import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -33,7 +31,7 @@ import org.slf4j.LoggerFactory; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; -public class GcsAvroWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter { +public class GcsAvroWriter extends BaseGcsWriter implements DestinationFileWriter { protected static final Logger LOGGER = LoggerFactory.getLogger(GcsAvroWriter.class); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java index c92332fc510ba..973ccb943b570 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java @@ -10,13 +10,11 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; -import io.airbyte.integrations.destination.gcs.writer.CommonWriter; -import io.airbyte.integrations.destination.gcs.writer.GscWriter; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.csv.CsvSheetGenerator; import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -30,7 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GcsCsvWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter { +public class GcsCsvWriter extends BaseGcsWriter implements DestinationFileWriter { private static final Logger LOGGER = LoggerFactory.getLogger(GcsCsvWriter.class); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java index 1e1666e529af4..3b6b6318045cc 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java @@ -15,11 +15,9 @@ import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; -import io.airbyte.integrations.destination.gcs.writer.CommonWriter; -import io.airbyte.integrations.destination.gcs.writer.GscWriter; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -30,7 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter { +public class GcsJsonlWriter extends BaseGcsWriter implements DestinationFileWriter { protected static final Logger LOGGER = LoggerFactory.getLogger(GcsJsonlWriter.class); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java index f3f61eb3f4a1c..501db7871d54e 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/parquet/GcsParquetWriter.java @@ -10,12 +10,10 @@ import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig; import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter; -import io.airbyte.integrations.destination.gcs.writer.CommonWriter; -import io.airbyte.integrations.destination.gcs.writer.GscWriter; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory; import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -35,7 +33,7 @@ import org.slf4j.LoggerFactory; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; -public class GcsParquetWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter { +public class GcsParquetWriter extends BaseGcsWriter implements DestinationFileWriter { private static final Logger LOGGER = LoggerFactory.getLogger(GcsParquetWriter.class); private static final ObjectMapper MAPPER = new ObjectMapper(); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java index a70445369885f..8ff12911446fe 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java @@ -12,7 +12,7 @@ import io.airbyte.integrations.destination.s3.S3DestinationConstants; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; @@ -33,7 +33,7 @@ *
  • Create the bucket and prepare the bucket path.
  • * */ -public abstract class BaseGcsWriter implements S3Writer, CommonWriter { +public abstract class BaseGcsWriter implements DestinationFileWriter { private static final Logger LOGGER = LoggerFactory.getLogger(BaseGcsWriter.class); diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/CommonWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/CommonWriter.java deleted file mode 100644 index 2789b7449bcf5..0000000000000 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/CommonWriter.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.gcs.writer; - -import com.fasterxml.jackson.databind.JsonNode; -import java.io.IOException; - -public interface CommonWriter { - - void initialize() throws IOException; - - void write(JsonNode formattedData) throws IOException; - - void close(boolean hasFailed) throws Exception; - -} diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java index 8a5ef44c54ede..719a225e469a3 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GcsWriterFactory.java @@ -6,7 +6,7 @@ import com.amazonaws.services.s3.AmazonS3; import io.airbyte.integrations.destination.gcs.GcsDestinationConfig; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.sql.Timestamp; @@ -15,7 +15,7 @@ */ public interface GcsWriterFactory { - S3Writer create(GcsDestinationConfig config, + DestinationFileWriter create(GcsDestinationConfig config, AmazonS3 s3Client, ConfiguredAirbyteStream configuredStream, Timestamp uploadTimestamp) diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java index c33bbf38082ca..b5e9c373c14c2 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/ProductionWriterFactory.java @@ -13,7 +13,7 @@ import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroConstants; import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.sql.Timestamp; @@ -26,7 +26,7 @@ public class ProductionWriterFactory implements GcsWriterFactory { protected static final Logger LOGGER = LoggerFactory.getLogger(ProductionWriterFactory.class); @Override - public S3Writer create(final GcsDestinationConfig config, + public DestinationFileWriter create(final GcsDestinationConfig config, final AmazonS3 s3Client, final ConfiguredAirbyteStream configuredStream, final Timestamp uploadTimestamp) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 5f7aef024cbd3..fa76c132e2b1c 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -13,7 +13,7 @@ import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig; import io.airbyte.integrations.destination.s3.csv.S3CsvWriter; import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; @@ -46,7 +46,7 @@ public abstract class S3StreamCopier implements StreamCopier { private final ConfiguredAirbyteStream configuredAirbyteStream; private final Timestamp uploadTime; protected final String stagingFolder; - protected final Map stagingWritersByFile = new HashMap<>(); + protected final Map stagingWritersByFile = new HashMap<>(); private final boolean purgeStagingData; // The number of batches of records that will be inserted into each file. @@ -128,7 +128,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage, final @Override public void closeStagingUploader(final boolean hasFailed) throws Exception { - for (final S3Writer writer : stagingWritersByFile.values()) { + for (final DestinationFileWriter writer : stagingWritersByFile.values()) { writer.close(hasFailed); } } @@ -148,7 +148,7 @@ public void createTemporaryTable() throws Exception { @Override public void copyStagingFileToTemporaryTable() throws Exception { LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); - for (final Map.Entry entry : stagingWritersByFile.entrySet()) { + for (final Map.Entry entry : stagingWritersByFile.entrySet()) { final String objectKey = entry.getValue().getOutputPath(); copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), objectKey), schemaName, tmpTableName, s3Config); } @@ -180,7 +180,7 @@ public String generateMergeStatement(final String destTableName) { @Override public void removeFileAndDropTmpTable() throws Exception { if (purgeStagingData) { - for (final Map.Entry entry : stagingWritersByFile.entrySet()) { + for (final Map.Entry entry : stagingWritersByFile.entrySet()) { final String suffix = entry.getKey(); final String objectKey = entry.getValue().getOutputPath(); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java index d7c36d15dfee4..651a0a7f8cfdb 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java @@ -8,7 +8,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.integrations.destination.s3.writer.S3WriterFactory; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -28,7 +28,7 @@ public class S3Consumer extends FailureTrackingAirbyteMessageConsumer { private final ConfiguredAirbyteCatalog configuredCatalog; private final S3WriterFactory writerFactory; private final Consumer outputRecordCollector; - private final Map streamNameAndNamespaceToWriters; + private final Map streamNameAndNamespaceToWriters; private AirbyteMessage lastStateMessage = null; @@ -49,7 +49,7 @@ protected void startTracked() throws Exception { final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis()); for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { - final S3Writer writer = writerFactory + final DestinationFileWriter writer = writerFactory .create(s3DestinationConfig, s3Client, configuredStream, uploadTimestamp); writer.initialize(); @@ -85,7 +85,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti @Override protected void close(final boolean hasFailed) throws Exception { - for (final S3Writer handler : streamNameAndNamespaceToWriters.values()) { + for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) { handler.close(hasFailed); } // S3 stream uploader is all or nothing if a failure happens in the destination. diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java index 20f15d79c5c48..bb12455cd4ef2 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java @@ -11,7 +11,7 @@ import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; import io.airbyte.integrations.destination.s3.writer.BaseS3Writer; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; -public class S3AvroWriter extends BaseS3Writer implements S3Writer { +public class S3AvroWriter extends BaseS3Writer implements DestinationFileWriter { protected static final Logger LOGGER = LoggerFactory.getLogger(S3AvroWriter.class); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java index 500ffceff625d..9e264ca9f5fec 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java @@ -11,7 +11,7 @@ import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; import io.airbyte.integrations.destination.s3.writer.BaseS3Writer; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -25,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3CsvWriter extends BaseS3Writer implements S3Writer { +public class S3CsvWriter extends BaseS3Writer implements DestinationFileWriter { private static final Logger LOGGER = LoggerFactory.getLogger(S3CsvWriter.class); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java index af90cbecccf38..10d19d715887e 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java @@ -17,7 +17,7 @@ import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; import io.airbyte.integrations.destination.s3.writer.BaseS3Writer; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.PrintWriter; @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3JsonlWriter extends BaseS3Writer implements S3Writer { +public class S3JsonlWriter extends BaseS3Writer implements DestinationFileWriter { protected static final Logger LOGGER = LoggerFactory.getLogger(S3JsonlWriter.class); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java index e6b997b895eaf..fcec1904c9767 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java @@ -9,7 +9,7 @@ import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory; import io.airbyte.integrations.destination.s3.writer.BaseS3Writer; -import io.airbyte.integrations.destination.s3.writer.S3Writer; +import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.io.IOException; @@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; -public class S3ParquetWriter extends BaseS3Writer implements S3Writer { +public class S3ParquetWriter extends BaseS3Writer implements DestinationFileWriter { private static final Logger LOGGER = LoggerFactory.getLogger(S3ParquetWriter.class); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java index ad0badae0e60c..f5d72d8e164ae 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java @@ -34,7 +34,7 @@ *
  • Log and close the write.
  • * */ -public abstract class BaseS3Writer implements S3Writer { +public abstract class BaseS3Writer implements DestinationFileWriter { private static final Logger LOGGER = LoggerFactory.getLogger(BaseS3Writer.class); public static final String DEFAULT_SUFFIX = "_0"; diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GscWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/DestinationFileWriter.java similarity index 63% rename from airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GscWriter.java rename to airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/DestinationFileWriter.java index c043971ff4945..e7e46d4f99e4c 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/GscWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/DestinationFileWriter.java @@ -2,11 +2,11 @@ * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.gcs.writer; +package io.airbyte.integrations.destination.s3.writer; import io.airbyte.integrations.destination.s3.S3Format; -public interface GscWriter extends CommonWriter { +public interface DestinationFileWriter extends DestinationWriter { String getFileLocation(); diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/S3Writer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/DestinationWriter.java similarity index 60% rename from airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/S3Writer.java rename to airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/DestinationWriter.java index f9f5123cbe81c..a0eecdd476652 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/S3Writer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/DestinationWriter.java @@ -4,15 +4,16 @@ package io.airbyte.integrations.destination.s3.writer; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.io.IOException; import java.util.UUID; /** - * {@link S3Writer} is responsible for writing Airbyte stream data to an S3 location in a specific + * {@link DestinationWriter} is responsible for writing Airbyte stream data to an S3 location in a specific * format. */ -public interface S3Writer { +public interface DestinationWriter { /** * Prepare an S3 writer for the stream. @@ -24,16 +25,11 @@ public interface S3Writer { */ void write(UUID id, AirbyteRecordMessage recordMessage) throws IOException; + void write(JsonNode formattedData) throws IOException; + /** * Close the S3 writer for the stream. */ void close(boolean hasFailed) throws IOException; - /** - * @return The path within the bucket that this writer will create. For example, if it is writing to - * "s3://yourBucket/some/path/to/file.csv", this method would return - * "some/path/to/file.csv". - */ - String getOutputPath(); - } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/ProductionWriterFactory.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/ProductionWriterFactory.java index 9d09fd5053d95..e8f63f7f45597 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/ProductionWriterFactory.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/ProductionWriterFactory.java @@ -25,7 +25,7 @@ public class ProductionWriterFactory implements S3WriterFactory { protected static final Logger LOGGER = LoggerFactory.getLogger(ProductionWriterFactory.class); @Override - public S3Writer create(final S3DestinationConfig config, + public DestinationFileWriter create(final S3DestinationConfig config, final AmazonS3 s3Client, final ConfiguredAirbyteStream configuredStream, final Timestamp uploadTimestamp) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/S3WriterFactory.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/S3WriterFactory.java index 79271b98f11d5..7f1e6c5a7c015 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/S3WriterFactory.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/S3WriterFactory.java @@ -10,11 +10,11 @@ import java.sql.Timestamp; /** - * Create different {@link S3Writer} based on {@link S3DestinationConfig}. + * Create different {@link DestinationFileWriter} based on {@link S3DestinationConfig}. */ public interface S3WriterFactory { - S3Writer create(S3DestinationConfig config, + DestinationFileWriter create(S3DestinationConfig config, AmazonS3 s3Client, ConfiguredAirbyteStream configuredStream, Timestamp uploadTimestamp) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index bfa451bbd0c9b..94e516c0d2abf 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -70,7 +70,7 @@ public void useInsertStrategyTest() { assertFalse(SnowflakeDestination.isS3Copy(stubConfig)); } - @Test + //@Test public void testCleanupStageOnFailure() throws Exception { JdbcDatabase mockDb = mock(JdbcDatabase.class); From dcd945e369a5eedca7ee609d1513b985e2f3f375 Mon Sep 17 00:00:00 2001 From: "andrii.leonets" Date: Tue, 18 Jan 2022 17:15:09 +0200 Subject: [PATCH 2/4] add missing impl --- .../destination/s3/avro/S3AvroWriter.java | 18 ++++++++++++++++++ .../destination/s3/csv/S3CsvWriter.java | 17 +++++++++++++++++ .../destination/s3/jsonl/S3JsonlWriter.java | 19 +++++++++++++++++++ .../s3/parquet/S3ParquetWriter.java | 17 +++++++++++++++++ 4 files changed, 71 insertions(+) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java index bb12455cd4ef2..1ee16ee28da08 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java @@ -7,6 +7,7 @@ import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; @@ -35,6 +36,7 @@ public class S3AvroWriter extends BaseS3Writer implements DestinationFileWriter private final MultiPartOutputStream outputStream; private final DataFileWriter dataFileWriter; private final String objectKey; + private final String gcsFileLocation; public S3AvroWriter(final S3DestinationConfig config, final AmazonS3 s3Client, @@ -49,6 +51,7 @@ public S3AvroWriter(final S3DestinationConfig config, objectKey = String.join("/", outputPrefix, outputFilename); LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey); + gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey); this.avroRecordFactory = new AvroRecordFactory(schema, converter); this.uploadManager = S3StreamTransferManagerHelper.getDefault( @@ -88,4 +91,19 @@ public String getOutputPath() { return objectKey; } + @Override + public String getFileLocation() { + return gcsFileLocation; + } + + @Override + public S3Format getFileFormat() { + return S3Format.AVRO; + } + + @Override + public void write(JsonNode formattedData) throws IOException { + final GenericData.Record record = avroRecordFactory.getAvroRecord(formattedData); + dataFileWriter.append(record); + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java index 9e264ca9f5fec..3f9a482fd2820 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java @@ -7,6 +7,7 @@ import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper; @@ -34,6 +35,7 @@ public class S3CsvWriter extends BaseS3Writer implements DestinationFileWriter { private final MultiPartOutputStream outputStream; private final CSVPrinter csvPrinter; private final String objectKey; + private final String gcsFileLocation; private S3CsvWriter(final S3DestinationConfig config, final AmazonS3 s3Client, @@ -55,6 +57,7 @@ private S3CsvWriter(final S3DestinationConfig config, LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey); + gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey); this.uploadManager = S3StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize()) .numUploadThreads(uploadThreads) @@ -156,4 +159,18 @@ public String getOutputPath() { return objectKey; } + @Override + public String getFileLocation() { + return gcsFileLocation; + } + + @Override + public S3Format getFileFormat() { + return S3Format.CSV; + } + + @Override + public void write(JsonNode formattedData) throws IOException { + csvPrinter.printRecord(csvSheetGenerator.getDataRow(formattedData)); + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java index 10d19d715887e..9775d4921ae93 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/jsonl/S3JsonlWriter.java @@ -7,6 +7,7 @@ import alex.mojaki.s3upload.MultiPartOutputStream; import alex.mojaki.s3upload.StreamTransferManager; import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -20,6 +21,8 @@ import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; + +import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; @@ -38,6 +41,7 @@ public class S3JsonlWriter extends BaseS3Writer implements DestinationFileWriter private final MultiPartOutputStream outputStream; private final PrintWriter printWriter; private final String objectKey; + private final String gcsFileLocation; public S3JsonlWriter(final S3DestinationConfig config, final AmazonS3 s3Client, @@ -49,6 +53,7 @@ public S3JsonlWriter(final S3DestinationConfig config, objectKey = String.join("/", outputPrefix, outputFilename); LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey); + gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey); this.uploadManager = S3StreamTransferManagerHelper.getDefault( config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize()); @@ -85,4 +90,18 @@ public String getOutputPath() { return objectKey; } + @Override + public String getFileLocation() { + return gcsFileLocation; + } + + @Override + public S3Format getFileFormat() { + return S3Format.JSONL; + } + + @Override + public void write(JsonNode formattedData) throws IOException { + printWriter.println(Jsons.serialize(formattedData)); + } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java index fcec1904c9767..0f84d800788d6 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.s3.parquet; import com.amazonaws.services.s3.AmazonS3; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.destination.s3.S3DestinationConfig; import io.airbyte.integrations.destination.s3.S3Format; import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory; @@ -39,6 +40,7 @@ public class S3ParquetWriter extends BaseS3Writer implements DestinationFileWrit private final Schema schema; private final String outputFilename; private final String objectKey; + private final String gcsFileLocation; public S3ParquetWriter(final S3DestinationConfig config, final AmazonS3 s3Client, @@ -53,6 +55,7 @@ public S3ParquetWriter(final S3DestinationConfig config, objectKey = String.join("/", outputPrefix, outputFilename); LOGGER.info("Full S3 path for stream '{}': s3://{}/{}", stream.getName(), config.getBucketName(), objectKey); + gcsFileLocation = String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename); final URI uri = new URI( String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename)); @@ -123,4 +126,18 @@ public String getOutputPath() { return objectKey; } + @Override + public String getFileLocation() { + return gcsFileLocation; + } + + @Override + public S3Format getFileFormat() { + return S3Format.PARQUET; + } + + @Override + public void write(JsonNode formattedData) throws IOException { + parquetWriter.write(avroRecordFactory.getAvroRecord(formattedData)); + } } From cf89b13b024cb5338564d9abbce817ac0262a21c Mon Sep 17 00:00:00 2001 From: "andrii.leonets" Date: Fri, 21 Jan 2022 21:16:21 +0200 Subject: [PATCH 3/4] upd related classes --- .../integrations/destination/jdbc/copy/s3/S3StreamCopier.java | 2 +- airbyte-integrations/connectors/destination-s3/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 7b4bc1b4bbd1a..edeca94a933e5 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -208,7 +208,7 @@ public String getTmpTableName() { } @VisibleForTesting - public Map getStagingWritersByFile() { + public Map getStagingWritersByFile() { return stagingWritersByFile; } diff --git a/airbyte-integrations/connectors/destination-s3/README.md b/airbyte-integrations/connectors/destination-s3/README.md index a5b7f9bb7c4cc..967b83d834878 100644 --- a/airbyte-integrations/connectors/destination-s3/README.md +++ b/airbyte-integrations/connectors/destination-s3/README.md @@ -23,5 +23,5 @@ As a community contributor, you will need access to AWS to run the integration t - Modify `spec.json` to specify the configuration of this new format. - Update `S3FormatConfigs` to be able to construct a config for this new format. - Create a new package under `io.airbyte.integrations.destination.s3`. -- Implement a new `S3Writer`. The implementation can extend `BaseS3Writer`. +- Implement a new `DestinationFileWriter`. The implementation can extend `BaseS3Writer`. - Write an acceptance test for the new output format. The test can extend `S3DestinationAcceptanceTest`. From 5244d526203ad7dd9cce43a554611990bfb6a63e Mon Sep 17 00:00:00 2001 From: "andrii.leonets" Date: Mon, 24 Jan 2022 15:16:58 +0200 Subject: [PATCH 4/4] review remarks --- .../integrations/destination/s3/writer/BaseS3Writer.java | 2 +- .../destination/snowflake/SnowflakeDestinationTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java index f5d72d8e164ae..e9929825b0a82 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java @@ -37,7 +37,7 @@ public abstract class BaseS3Writer implements DestinationFileWriter { private static final Logger LOGGER = LoggerFactory.getLogger(BaseS3Writer.class); - public static final String DEFAULT_SUFFIX = "_0"; + private static final String DEFAULT_SUFFIX = "_0"; protected final S3DestinationConfig config; protected final AmazonS3 s3Client; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index 179fc7255aa09..74cb73490d5ba 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -73,7 +73,7 @@ public void useInsertStrategyTest() { assertFalse(SnowflakeDestination.isS3Copy(stubConfig)); } - //@Test + @Test public void testCleanupStageOnFailure() throws Exception { JdbcDatabase mockDb = mock(JdbcDatabase.class);