From 5b858da70a5ce53fbca340e86181469dd74b7523 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 21 Jan 2022 09:59:25 +0200 Subject: [PATCH 1/6] updated azure blob storage --- .../b4c5d105-31fd-4817-96b6-cb923bfc04cb.json | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../destination-azure-blob-storage/Dockerfile | 2 +- .../AzureBlobStorageConsumer.java | 74 +++++++++++-------- .../csv/AzureBlobStorageCsvWriter.java | 28 +++---- .../jsonl/AzureBlobStorageJsonlWriter.java | 3 +- .../writer/AzureBlobStorageWriterFactory.java | 3 +- .../writer/ProductionWriterFactory.java | 9 +-- ...obStorageCsvDestinationAcceptanceTest.java | 35 ++++++++- ...eBlobStorageDestinationAcceptanceTest.java | 45 +++++++---- ...StorageJsonlDestinationAcceptanceTest.java | 21 ++++++ 11 files changed, 142 insertions(+), 82 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json index 3c436bbd29d6..ecb42472e00f 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/b4c5d105-31fd-4817-96b6-cb923bfc04cb.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb", "name": "Azure Blob Storage", "dockerRepository": "airbyte/destination-azure-blob-storage", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.2", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage", "icon": "azureblobstorage.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 6f9cb6856d36..d44a5520ecf4 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -1,7 +1,7 @@ # This file is generated by io.airbyte.config.specs.SeedConnectorSpecGenerator. # Do NOT edit this file directly. See generator class for more details. --- -- dockerImage: "airbyte/destination-azure-blob-storage:0.1.1" +- dockerImage: "airbyte/destination-azure-blob-storage:0.1.2" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/azureblobstorage" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile index 8e644aa025fe..5b076391c473 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-azure-blob-storage COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/destination-azure-blob-storage diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java index 843868ab54a6..8802063f2ac7 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -19,13 +19,20 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.SyncMode; +import io.airbyte.protocol.models.DestinationSyncMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import java.util.UUID; import java.util.function.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsumer { @@ -39,6 +46,8 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsu private AirbyteMessage lastStateMessage = null; + public static final String YYYY_MM_DD_FORMAT_STRING = "yyyy_MM_dd"; + public AzureBlobStorageConsumer( final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig, final ConfiguredAirbyteCatalog configuredCatalog, @@ -67,15 +76,16 @@ protected void startTracked() throws Exception { for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { + final String blobName = configuredStream.getStream().getName() + "/" + + getOutputFilename(new Timestamp(System.currentTimeMillis())); final AppendBlobClient appendBlobClient = specializedBlobClientBuilder - .blobName(configuredStream.getStream().getName()) + .blobName(blobName) .buildAppendBlobClient(); - final boolean isNewlyCreatedBlob = createContainers(appendBlobClient, configuredStream); + createContainers(specializedBlobClientBuilder, appendBlobClient, configuredStream); final AzureBlobStorageWriter writer = writerFactory - .create(azureBlobStorageDestinationConfig, appendBlobClient, configuredStream, - isNewlyCreatedBlob); + .create(azureBlobStorageDestinationConfig, appendBlobClient, configuredStream); final AirbyteStream stream = configuredStream.getStream(); final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair @@ -84,39 +94,30 @@ protected void startTracked() throws Exception { } } - private boolean createContainers(final AppendBlobClient appendBlobClient, - final ConfiguredAirbyteStream configuredStream) { + private void createContainers(final SpecializedBlobClientBuilder specializedBlobClientBuilder, + final AppendBlobClient appendBlobClient, + final ConfiguredAirbyteStream configuredStream) { // create container if absent (aka SQl Schema) final BlobContainerClient containerClient = appendBlobClient.getContainerClient(); if (!containerClient.exists()) { containerClient.create(); } - // create a storage container if absent (aka Table is SQL BD) - if (SyncMode.FULL_REFRESH.equals(configuredStream.getSyncMode())) { - // full refresh sync. Create blob and override if any + if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) { LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically" - + " created or all data would be overridden (if any) for stream:" + configuredStream + + " created or all data would be overridden (if any) for stream:" + configuredStream .getStream().getName()); - appendBlobClient.create(true); - return true; - } else { - // incremental sync. Create new container only if still absent - if (!appendBlobClient.exists()) { - LOGGER.info("Sync mode is selected to APPEND mode. New container will be automatically" - + " created for stream:" + configuredStream.getStream().getName()); - appendBlobClient.create(false); - LOGGER.info(appendBlobClient.getBlobName() + " blob has been created"); - return true; - } else { - LOGGER.info(String.format( - "Sync mode is selected to APPEND mode. Container %s already exists. Append mode is " - + "only available for \"Append blobs\". For more details please visit" - + " https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs", - configuredStream.getStream().getName())); - LOGGER.info(appendBlobClient.getBlobName() + " already exists"); - return false; - } + var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false) + .collect(Collectors.toList()); + blobItemList.forEach(blob -> { + if (!blob.isDeleted() && blob.getName().contains(configuredStream.getStream().getName() + "/")) { + final AppendBlobClient abc = specializedBlobClientBuilder + .blobName(blob.getName()) + .buildAppendBlobClient(); + abc.delete(); + } + }); } + appendBlobClient.create(true); } @Override @@ -161,4 +162,13 @@ protected void close(final boolean hasFailed) throws Exception { } } + private static String getOutputFilename(final Timestamp timestamp) { + final DateFormat formatter = new SimpleDateFormat(YYYY_MM_DD_FORMAT_STRING); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + return String.format( + "%s_%d_0", + formatter.format(timestamp), + timestamp.getTime()); + } + } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java index f31aaab64fa0..800cf582607e 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java @@ -10,17 +10,18 @@ import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.nio.charset.StandardCharsets; -import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; import org.apache.commons.csv.QuoteMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implements AzureBlobStorageWriter { @@ -32,8 +33,7 @@ public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implem public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config, final AppendBlobClient appendBlobClient, - final ConfiguredAirbyteStream configuredStream, - final boolean isNewlyCreatedBlob) + final ConfiguredAirbyteStream configuredStream) throws IOException { super(config, appendBlobClient, configuredStream); @@ -46,17 +46,9 @@ public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config, this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize()); - if (isNewlyCreatedBlob) { - this.csvPrinter = new CSVPrinter( - new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), - CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) - .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); - } else { - // no header required for append - this.csvPrinter = new CSVPrinter( - new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), - CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)); - } + this.csvPrinter = new CSVPrinter(new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), + CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) + .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); } @Override diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java index 77ed63b7dee6..874a2c280e90 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java @@ -37,8 +37,7 @@ public class AzureBlobStorageJsonlWriter extends BaseAzureBlobStorageWriter impl public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig config, final AppendBlobClient appendBlobClient, - final ConfiguredAirbyteStream configuredStream, - final boolean isNewlyCreatedBlob) { + final ConfiguredAirbyteStream configuredStream) { super(config, appendBlobClient, configuredStream); // at this moment we already receive appendBlobClient initialized this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize()); diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriterFactory.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriterFactory.java index 0b4ed34ac622..004506652f4c 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriterFactory.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/AzureBlobStorageWriterFactory.java @@ -16,8 +16,7 @@ public interface AzureBlobStorageWriterFactory { AzureBlobStorageWriter create(AzureBlobStorageDestinationConfig config, AppendBlobClient appendBlobClient, - ConfiguredAirbyteStream configuredStream, - boolean isNewlyCreatedBlob) + ConfiguredAirbyteStream configuredStream) throws Exception; } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/ProductionWriterFactory.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/ProductionWriterFactory.java index c4033098c4b2..a18dddfe98a7 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/ProductionWriterFactory.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/writer/ProductionWriterFactory.java @@ -20,21 +20,18 @@ public class ProductionWriterFactory implements AzureBlobStorageWriterFactory { @Override public AzureBlobStorageWriter create(final AzureBlobStorageDestinationConfig config, final AppendBlobClient appendBlobClient, - final ConfiguredAirbyteStream configuredStream, - final boolean isNewlyCreatedBlob) + final ConfiguredAirbyteStream configuredStream) throws Exception { final AzureBlobStorageFormat format = config.getFormatConfig().getFormat(); if (format == AzureBlobStorageFormat.CSV) { LOGGER.debug("Picked up CSV format writer"); - return new AzureBlobStorageCsvWriter(config, appendBlobClient, configuredStream, - isNewlyCreatedBlob); + return new AzureBlobStorageCsvWriter(config, appendBlobClient, configuredStream); } if (format == AzureBlobStorageFormat.JSONL) { LOGGER.debug("Picked up JSONL format writer"); - return new AzureBlobStorageJsonlWriter(config, appendBlobClient, configuredStream, - isNewlyCreatedBlob); + return new AzureBlobStorageJsonlWriter(config, appendBlobClient, configuredStream); } throw new RuntimeException("Unexpected AzureBlobStorage destination format: " + format); diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java index e1802e454783..ea7e8f27d3cb 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java @@ -4,13 +4,20 @@ package io.airbyte.integrations.destination.azure_blob_storage; +import com.azure.storage.blob.specialized.AppendBlobClient; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.csv.QuoteMode; + +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Reader; import java.io.StringReader; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -18,9 +25,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.stream.StreamSupport; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVRecord; -import org.apache.commons.csv.QuoteMode; public class AzureBlobStorageCsvDestinationAcceptanceTest extends AzureBlobStorageDestinationAcceptanceTest { @@ -103,4 +107,29 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } + @Override + protected String getAllSyncedObjects(String streamName) { + try { + final List appendBlobClients = getAppendBlobClient(streamName); + StringBuilder result = new StringBuilder(); + for (AppendBlobClient appendBlobClient : appendBlobClients) { + if (result.isEmpty()) { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + appendBlobClient.download(outputStream); + result.append(outputStream.toString(StandardCharsets.UTF_8)); + } else { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + appendBlobClient.download(outputStream); + var stringStream = outputStream.toString(StandardCharsets.UTF_8); + result.append(stringStream.substring(stringStream.indexOf('\n') + 1)); + } + } + LOGGER.info("All objects: " + result); + return result.toString(); + } catch (Exception e) { + e.printStackTrace(); + return ""; + } + } + } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java index e7894b1a4919..278e6e713b12 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java @@ -17,12 +17,15 @@ import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + public abstract class AzureBlobStorageDestinationAcceptanceTest extends DestinationAcceptanceTest { protected static final Logger LOGGER = LoggerFactory @@ -67,19 +70,29 @@ protected JsonNode getFailCheckConfig() { /** * Helper method to retrieve all synced objects inside the configured bucket path. */ - @Deprecated - protected String getAllSyncedObjects(final String streamName) { - final AppendBlobClient appendBlobClient = specializedBlobClientBuilder - .blobName(streamName) - .buildAppendBlobClient(); - - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - appendBlobClient.download(outputStream); - final String result = new String(outputStream.toByteArray(), StandardCharsets.UTF_8); - - LOGGER.info("All objects: " + result); - return result; - + protected abstract String getAllSyncedObjects(final String streamName); + + protected List getAppendBlobClient(final String streamName) throws Exception { + final AppendBlobClient streamAppendBlobClient = specializedBlobClientBuilder + .blobName(streamName) + .buildAppendBlobClient(); + + final BlobContainerClient containerClient = streamAppendBlobClient.getContainerClient(); + var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false) + .collect(Collectors.toList()); + var filteredBlobList = blobItemList.stream() + .filter(blob -> blob.getName().contains(streamName + "/")).collect(Collectors.toList()); + if (!filteredBlobList.isEmpty()) { + List clobClientList = new ArrayList<>(); + filteredBlobList.forEach(blobItem -> { + clobClientList.add(specializedBlobClientBuilder.blobName(blobItem.getName()).buildAppendBlobClient()); + }); + return clobClientList; + } else { + var errorText = String.format("Can not find blob started with: %s/", streamName); + LOGGER.error(errorText); + throw new Exception(errorText); + } } protected abstract JsonNode getFormatConfig(); diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java index 300683b7f412..2922434ed140 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java @@ -4,10 +4,14 @@ package io.airbyte.integrations.destination.azure_blob_storage; +import com.azure.storage.blob.specialized.AppendBlobClient; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; + +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.List; @@ -41,4 +45,21 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return jsonRecords; } + @Override + protected String getAllSyncedObjects(String streamName) { + try { + final List appendBlobClients = getAppendBlobClient(streamName); + StringBuilder result = new StringBuilder(); + for (AppendBlobClient appendBlobClient : appendBlobClients) { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + appendBlobClient.download(outputStream); + result.append(outputStream.toString(StandardCharsets.UTF_8)); + } + LOGGER.info("All objects: " + result); + return result.toString(); + } catch (Exception e) { + e.printStackTrace(); + return ""; + } + } } From 99d77ad3b5c2a6967399b93c3bc185c3bf1a85bc Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 21 Jan 2022 10:09:55 +0200 Subject: [PATCH 2/6] updated azure blob storage documentation --- docs/integrations/destinations/azureblobstorage.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/integrations/destinations/azureblobstorage.md b/docs/integrations/destinations/azureblobstorage.md index 94809438f453..64067c357d2e 100644 --- a/docs/integrations/destinations/azureblobstorage.md +++ b/docs/integrations/destinations/azureblobstorage.md @@ -137,7 +137,8 @@ They will be like this in the output file: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.1.1 | 2021-12-29 | [\#5332](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. | +| 0.1.2 | 2022-01-20 | [\#9682](https://github.com/airbytehq/airbyte/pull/9682) | Each data synchronization for each thread is written to a new blob to the folder with stream name. | +| 0.1.1 | 2021-12-29 | [\#9190](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. | | 0.1.0 | 2021-08-30 | [\#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. | From 1f3938e4f76be64a0988626ad89c022ff50429c3 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 21 Jan 2022 10:46:03 +0200 Subject: [PATCH 3/6] fix remarks --- .../azure_blob_storage/AzureBlobStorageConsumer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java index 8802063f2ac7..9b01d03de6aa 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -37,6 +37,7 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageConsumer.class); + private static final String YYYY_MM_DD_FORMAT_STRING = "yyyy_MM_dd"; private final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig; private final ConfiguredAirbyteCatalog configuredCatalog; @@ -46,8 +47,6 @@ public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsu private AirbyteMessage lastStateMessage = null; - public static final String YYYY_MM_DD_FORMAT_STRING = "yyyy_MM_dd"; - public AzureBlobStorageConsumer( final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig, final ConfiguredAirbyteCatalog configuredCatalog, From bccc9c9294538c6044334d42bf08fa02c317a90a Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 21 Jan 2022 10:51:39 +0200 Subject: [PATCH 4/6] fix remarks --- .../AzureBlobStorageCsvDestinationAcceptanceTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java index ea7e8f27d3cb..2e51e1126ccd 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java @@ -113,13 +113,11 @@ protected String getAllSyncedObjects(String streamName) { final List appendBlobClients = getAppendBlobClient(streamName); StringBuilder result = new StringBuilder(); for (AppendBlobClient appendBlobClient : appendBlobClients) { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + appendBlobClient.download(outputStream); if (result.isEmpty()) { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - appendBlobClient.download(outputStream); result.append(outputStream.toString(StandardCharsets.UTF_8)); } else { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - appendBlobClient.download(outputStream); var stringStream = outputStream.toString(StandardCharsets.UTF_8); result.append(stringStream.substring(stringStream.indexOf('\n') + 1)); } From c87117fd928bb2e184b96ee77294ac96861d5ca9 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 21 Jan 2022 11:09:55 +0200 Subject: [PATCH 5/6] fix remarks and format code --- .../AzureBlobStorageConsumer.java | 21 +++++++++---------- .../csv/AzureBlobStorageCsvWriter.java | 17 +++++++-------- ...obStorageCsvDestinationAcceptanceTest.java | 9 ++++---- ...eBlobStorageDestinationAcceptanceTest.java | 13 ++++++------ ...StorageJsonlDestinationAcceptanceTest.java | 4 ++-- 5 files changed, 30 insertions(+), 34 deletions(-) diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java index 9b01d03de6aa..fdd0364a21f0 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageConsumer.java @@ -20,9 +20,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.sql.Timestamp; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -33,6 +30,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsumer { @@ -76,7 +75,7 @@ protected void startTracked() throws Exception { for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) { final String blobName = configuredStream.getStream().getName() + "/" + - getOutputFilename(new Timestamp(System.currentTimeMillis())); + getOutputFilename(new Timestamp(System.currentTimeMillis())); final AppendBlobClient appendBlobClient = specializedBlobClientBuilder .blobName(blobName) .buildAppendBlobClient(); @@ -103,15 +102,15 @@ private void createContainers(final SpecializedBlobClientBuilder specializedBlob } if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) { LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically" - + " created or all data would be overridden (if any) for stream:" + configuredStream + + " created or all data would be overridden (if any) for stream:" + configuredStream .getStream().getName()); var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false) - .collect(Collectors.toList()); + .collect(Collectors.toList()); blobItemList.forEach(blob -> { if (!blob.isDeleted() && blob.getName().contains(configuredStream.getStream().getName() + "/")) { final AppendBlobClient abc = specializedBlobClientBuilder - .blobName(blob.getName()) - .buildAppendBlobClient(); + .blobName(blob.getName()) + .buildAppendBlobClient(); abc.delete(); } }); @@ -165,9 +164,9 @@ private static String getOutputFilename(final Timestamp timestamp) { final DateFormat formatter = new SimpleDateFormat(YYYY_MM_DD_FORMAT_STRING); formatter.setTimeZone(TimeZone.getTimeZone("UTC")); return String.format( - "%s_%d_0", - formatter.format(timestamp), - timestamp.getTime()); + "%s_%d_0", + formatter.format(timestamp), + timestamp.getTime()); } } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java index 800cf582607e..d267bf4d5325 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java @@ -10,17 +10,16 @@ import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteStream; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.apache.commons.csv.QuoteMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.util.UUID; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.QuoteMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implements AzureBlobStorageWriter { @@ -46,9 +45,9 @@ public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config, this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize()); - this.csvPrinter = new CSVPrinter(new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8), - CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) - .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); + final PrintWriter printWriter = new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8); + this.csvPrinter = new CSVPrinter(printWriter, CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL) + .withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0]))); } @Override diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java index 2e51e1126ccd..904d55ae44dc 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageCsvDestinationAcceptanceTest.java @@ -9,10 +9,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVRecord; -import org.apache.commons.csv.QuoteMode; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Reader; @@ -25,6 +21,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.stream.StreamSupport; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.csv.QuoteMode; public class AzureBlobStorageCsvDestinationAcceptanceTest extends AzureBlobStorageDestinationAcceptanceTest { @@ -125,7 +124,7 @@ protected String getAllSyncedObjects(String streamName) { LOGGER.info("All objects: " + result); return result.toString(); } catch (Exception e) { - e.printStackTrace(); + LOGGER.error("No blobs were found for stream with name {}.", streamName); return ""; } } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java index 278e6e713b12..9226e1b0a025 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationAcceptanceTest.java @@ -17,14 +17,13 @@ import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AzureBlobStorageDestinationAcceptanceTest extends DestinationAcceptanceTest { @@ -74,14 +73,14 @@ protected JsonNode getFailCheckConfig() { protected List getAppendBlobClient(final String streamName) throws Exception { final AppendBlobClient streamAppendBlobClient = specializedBlobClientBuilder - .blobName(streamName) - .buildAppendBlobClient(); + .blobName(streamName) + .buildAppendBlobClient(); final BlobContainerClient containerClient = streamAppendBlobClient.getContainerClient(); var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false) - .collect(Collectors.toList()); + .collect(Collectors.toList()); var filteredBlobList = blobItemList.stream() - .filter(blob -> blob.getName().contains(streamName + "/")).collect(Collectors.toList()); + .filter(blob -> blob.getName().contains(streamName + "/")).collect(Collectors.toList()); if (!filteredBlobList.isEmpty()) { List clobClientList = new ArrayList<>(); filteredBlobList.forEach(blobItem -> { diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java index 2922434ed140..cf7f89903d41 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/test-integration/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageJsonlDestinationAcceptanceTest.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -58,8 +57,9 @@ protected String getAllSyncedObjects(String streamName) { LOGGER.info("All objects: " + result); return result.toString(); } catch (Exception e) { - e.printStackTrace(); + LOGGER.error("No blobs were found for stream with name {}.", streamName); return ""; } } + } From e28efcfa2487951610ac3fcd74a98bffed3ce9bf Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 24 Jan 2022 18:26:43 +0200 Subject: [PATCH 6/6] updated doc --- docs/integrations/destinations/azureblobstorage.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/destinations/azureblobstorage.md b/docs/integrations/destinations/azureblobstorage.md index 64067c357d2e..13615af56619 100644 --- a/docs/integrations/destinations/azureblobstorage.md +++ b/docs/integrations/destinations/azureblobstorage.md @@ -137,7 +137,7 @@ They will be like this in the output file: | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.1.2 | 2022-01-20 | [\#9682](https://github.com/airbytehq/airbyte/pull/9682) | Each data synchronization for each thread is written to a new blob to the folder with stream name. | +| 0.1.2 | 2022-01-20 | [\#9682](https://github.com/airbytehq/airbyte/pull/9682) | Each data synchronization for each stream is written to a new blob to the folder with stream name. | | 0.1.1 | 2021-12-29 | [\#9190](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. | | 0.1.0 | 2021-08-30 | [\#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. |