Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Updated azure blob storage destination #10277

Merged
merged 7 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "b4c5d105-31fd-4817-96b6-cb923bfc04cb",
"name": "Azure Blob Storage",
"dockerRepository": "airbyte/destination-azure-blob-storage",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/azureblobstorage",
"icon": "azureblobstorage.svg"
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is generated by io.airbyte.config.specs.SeedConnectorSpecGenerator.
# Do NOT edit this file directly. See generator class for more details.
---
- dockerImage: "airbyte/destination-azure-blob-storage:0.1.1"
- dockerImage: "airbyte/destination-azure-blob-storage:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/azureblobstorage"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-azure-blob-storage

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-azure-blob-storage
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AzureBlobStorageConsumer extends FailureTrackingAirbyteMessageConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(AzureBlobStorageConsumer.class);
private static final String YYYY_MM_DD_FORMAT_STRING = "yyyy_MM_dd";

private final AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig;
private final ConfiguredAirbyteCatalog configuredCatalog;
Expand Down Expand Up @@ -67,15 +74,16 @@ protected void startTracked() throws Exception {

for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {

final String blobName = configuredStream.getStream().getName() + "/" +
getOutputFilename(new Timestamp(System.currentTimeMillis()));
final AppendBlobClient appendBlobClient = specializedBlobClientBuilder
.blobName(configuredStream.getStream().getName())
.blobName(blobName)
.buildAppendBlobClient();

final boolean isNewlyCreatedBlob = createContainers(appendBlobClient, configuredStream);
createContainers(specializedBlobClientBuilder, appendBlobClient, configuredStream);

final AzureBlobStorageWriter writer = writerFactory
.create(azureBlobStorageDestinationConfig, appendBlobClient, configuredStream,
isNewlyCreatedBlob);
.create(azureBlobStorageDestinationConfig, appendBlobClient, configuredStream);

final AirbyteStream stream = configuredStream.getStream();
final AirbyteStreamNameNamespacePair streamNamePair = AirbyteStreamNameNamespacePair
Expand All @@ -84,39 +92,30 @@ protected void startTracked() throws Exception {
}
}

private boolean createContainers(final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream) {
private void createContainers(final SpecializedBlobClientBuilder specializedBlobClientBuilder,
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream) {
// create container if absent (aka SQl Schema)
final BlobContainerClient containerClient = appendBlobClient.getContainerClient();
if (!containerClient.exists()) {
containerClient.create();
}
// create a storage container if absent (aka Table is SQL BD)
if (SyncMode.FULL_REFRESH.equals(configuredStream.getSyncMode())) {
// full refresh sync. Create blob and override if any
if (DestinationSyncMode.OVERWRITE.equals(configuredStream.getDestinationSyncMode())) {
LOGGER.info("Sync mode is selected to OVERRIDE mode. New container will be automatically"
+ " created or all data would be overridden (if any) for stream:" + configuredStream
.getStream().getName());
appendBlobClient.create(true);
return true;
} else {
// incremental sync. Create new container only if still absent
if (!appendBlobClient.exists()) {
LOGGER.info("Sync mode is selected to APPEND mode. New container will be automatically"
+ " created for stream:" + configuredStream.getStream().getName());
appendBlobClient.create(false);
LOGGER.info(appendBlobClient.getBlobName() + " blob has been created");
return true;
} else {
LOGGER.info(String.format(
"Sync mode is selected to APPEND mode. Container %s already exists. Append mode is "
+ "only available for \"Append blobs\". For more details please visit"
+ " https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#blobs",
configuredStream.getStream().getName()));
LOGGER.info(appendBlobClient.getBlobName() + " already exists");
return false;
}
var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false)
.collect(Collectors.toList());
blobItemList.forEach(blob -> {
if (!blob.isDeleted() && blob.getName().contains(configuredStream.getStream().getName() + "/")) {
final AppendBlobClient abc = specializedBlobClientBuilder
.blobName(blob.getName())
.buildAppendBlobClient();
abc.delete();
}
});
}
appendBlobClient.create(true);
}

@Override
Expand Down Expand Up @@ -161,4 +160,13 @@ protected void close(final boolean hasFailed) throws Exception {
}
}

private static String getOutputFilename(final Timestamp timestamp) {
final DateFormat formatter = new SimpleDateFormat(YYYY_MM_DD_FORMAT_STRING);
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
return String.format(
"%s_%d_0",
formatter.format(timestamp),
timestamp.getTime());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public class AzureBlobStorageCsvWriter extends BaseAzureBlobStorageWriter implem

public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config,
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream,
final boolean isNewlyCreatedBlob)
final ConfiguredAirbyteStream configuredStream)
throws IOException {
super(config, appendBlobClient, configuredStream);

Expand All @@ -46,17 +45,9 @@ public AzureBlobStorageCsvWriter(final AzureBlobStorageDestinationConfig config,

this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize());

if (isNewlyCreatedBlob) {
this.csvPrinter = new CSVPrinter(
new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8),
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)
.withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0])));
} else {
// no header required for append
this.csvPrinter = new CSVPrinter(
new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8),
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL));
}
final PrintWriter printWriter = new PrintWriter(blobOutputStream, false, StandardCharsets.UTF_8);
this.csvPrinter = new CSVPrinter(printWriter, CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)
.withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0])));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public class AzureBlobStorageJsonlWriter extends BaseAzureBlobStorageWriter impl

public AzureBlobStorageJsonlWriter(final AzureBlobStorageDestinationConfig config,
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream,
final boolean isNewlyCreatedBlob) {
final ConfiguredAirbyteStream configuredStream) {
super(config, appendBlobClient, configuredStream);
// at this moment we already receive appendBlobClient initialized
this.blobOutputStream = new BufferedOutputStream(appendBlobClient.getBlobOutputStream(), config.getOutputStreamBufferSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ public interface AzureBlobStorageWriterFactory {

AzureBlobStorageWriter create(AzureBlobStorageDestinationConfig config,
AppendBlobClient appendBlobClient,
ConfiguredAirbyteStream configuredStream,
boolean isNewlyCreatedBlob)
ConfiguredAirbyteStream configuredStream)
throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,18 @@ public class ProductionWriterFactory implements AzureBlobStorageWriterFactory {
@Override
public AzureBlobStorageWriter create(final AzureBlobStorageDestinationConfig config,
final AppendBlobClient appendBlobClient,
final ConfiguredAirbyteStream configuredStream,
final boolean isNewlyCreatedBlob)
final ConfiguredAirbyteStream configuredStream)
throws Exception {
final AzureBlobStorageFormat format = config.getFormatConfig().getFormat();

if (format == AzureBlobStorageFormat.CSV) {
LOGGER.debug("Picked up CSV format writer");
return new AzureBlobStorageCsvWriter(config, appendBlobClient, configuredStream,
isNewlyCreatedBlob);
return new AzureBlobStorageCsvWriter(config, appendBlobClient, configuredStream);
}

if (format == AzureBlobStorageFormat.JSONL) {
LOGGER.debug("Picked up JSONL format writer");
return new AzureBlobStorageJsonlWriter(config, appendBlobClient, configuredStream,
isNewlyCreatedBlob);
return new AzureBlobStorageJsonlWriter(config, appendBlobClient, configuredStream);
}

throw new RuntimeException("Unexpected AzureBlobStorage destination format: " + format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

package io.airbyte.integrations.destination.azure_blob_storage;

import com.azure.storage.blob.specialized.AppendBlobClient;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -103,4 +106,27 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return jsonRecords;
}

@Override
protected String getAllSyncedObjects(String streamName) {
try {
final List<AppendBlobClient> appendBlobClients = getAppendBlobClient(streamName);
StringBuilder result = new StringBuilder();
for (AppendBlobClient appendBlobClient : appendBlobClients) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
appendBlobClient.download(outputStream);
if (result.isEmpty()) {
result.append(outputStream.toString(StandardCharsets.UTF_8));
} else {
var stringStream = outputStream.toString(StandardCharsets.UTF_8);
result.append(stringStream.substring(stringStream.indexOf('\n') + 1));
}
}
LOGGER.info("All objects: " + result);
return result.toString();
} catch (Exception e) {
LOGGER.error("No blobs were found for stream with name {}.", streamName);
return "";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,19 +69,29 @@ protected JsonNode getFailCheckConfig() {
/**
* Helper method to retrieve all synced objects inside the configured bucket path.
*/
@Deprecated
protected String getAllSyncedObjects(final String streamName) {
final AppendBlobClient appendBlobClient = specializedBlobClientBuilder
protected abstract String getAllSyncedObjects(final String streamName);

protected List<AppendBlobClient> getAppendBlobClient(final String streamName) throws Exception {
final AppendBlobClient streamAppendBlobClient = specializedBlobClientBuilder
.blobName(streamName)
.buildAppendBlobClient();

final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
appendBlobClient.download(outputStream);
final String result = new String(outputStream.toByteArray(), StandardCharsets.UTF_8);

LOGGER.info("All objects: " + result);
return result;

final BlobContainerClient containerClient = streamAppendBlobClient.getContainerClient();
var blobItemList = StreamSupport.stream(containerClient.listBlobs().spliterator(), false)
.collect(Collectors.toList());
var filteredBlobList = blobItemList.stream()
.filter(blob -> blob.getName().contains(streamName + "/")).collect(Collectors.toList());
if (!filteredBlobList.isEmpty()) {
List<AppendBlobClient> clobClientList = new ArrayList<>();
filteredBlobList.forEach(blobItem -> {
clobClientList.add(specializedBlobClientBuilder.blobName(blobItem.getName()).buildAppendBlobClient());
});
return clobClientList;
} else {
var errorText = String.format("Can not find blob started with: %s/", streamName);
LOGGER.error(errorText);
throw new Exception(errorText);
}
}

protected abstract JsonNode getFormatConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

package io.airbyte.integrations.destination.azure_blob_storage;

import com.azure.storage.blob.specialized.AppendBlobClient;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.JavaBaseConstants;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;

Expand Down Expand Up @@ -41,4 +44,22 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return jsonRecords;
}

@Override
protected String getAllSyncedObjects(String streamName) {
try {
final List<AppendBlobClient> appendBlobClients = getAppendBlobClient(streamName);
StringBuilder result = new StringBuilder();
for (AppendBlobClient appendBlobClient : appendBlobClients) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
appendBlobClient.download(outputStream);
result.append(outputStream.toString(StandardCharsets.UTF_8));
}
LOGGER.info("All objects: " + result);
return result.toString();
} catch (Exception e) {
LOGGER.error("No blobs were found for stream with name {}.", streamName);
return "";
}
}

}
3 changes: 2 additions & 1 deletion docs/integrations/destinations/azureblobstorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ They will be like this in the output file:

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.1 | 2021-12-29 | [\#5332](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. |
| 0.1.2 | 2022-01-20 | [\#9682](https://github.com/airbytehq/airbyte/pull/9682) | Each data synchronization for each stream is written to a new blob to the folder with stream name. |
| 0.1.1 | 2021-12-29 | [\#9190](https://github.com/airbytehq/airbyte/pull/9190) | Added BufferedOutputStream wrapper to blob output stream to improve performance and fix issues with 50,000 block limit. Also disabled autoflush on PrintWriter. |
| 0.1.0 | 2021-08-30 | [\#5332](https://github.com/airbytehq/airbyte/pull/5332) | Initial release with JSONL and CSV output. |