Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[destination-azure-blob-storage] File Extensions added for the output files #27701 #38061

Merged
merged 22 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6e961da
Destination AzureBlobStorage : Adding file extensions
richa-rochna Jun 23, 2023
85893f4
Destination AzureBlobStorage : Adding file extensions
richa-rochna Jun 26, 2023
0f26990
Merge branch 'master' into destination-azure-blob-storage
richa-rochna Jun 26, 2023
137392e
Destination AzureBlobStorage : Documentation update
richa-rochna Jun 26, 2023
a8808f2
Merge remote-tracking branch 'origin/destination-azure-blob-storage' …
richa-rochna Jun 26, 2023
913e30f
Destination AzureBlobStorage : Documentation update
richa-rochna Jun 26, 2023
a407785
Destination AzureBlobStorage : File extensions added for jsonl files
richa-rochna Jul 3, 2023
5de4cca
merge master
marcosmarxm May 8, 2024
486f50c
Merge branch 'master' into marcos/destination-azure-blob-files
marcosmarxm Jun 12, 2024
f1c1aa1
fix conflicts and bad merge
marcosmarxm Jun 12, 2024
ab044b8
update doc changelog
marcosmarxm Jun 12, 2024
2987886
get value from config robust
marcosmarxm Jun 12, 2024
67ec4ee
add for json format config
marcosmarxm Jun 12, 2024
d0f3305
format files
marcosmarxm Jun 12, 2024
0db3818
disable test
marcosmarxm Jun 12, 2024
a96aa91
Merge branch 'master' into marcos/destination-azure-blob-files
marcosmarxm Jun 12, 2024
233a700
format files
marcosmarxm Jun 12, 2024
07cc061
Merge branch 'master' into marcos/destination-azure-blob-files
marcosmarxm Jun 13, 2024
7f27b30
Update AzureBlobStorageSpillTest.java
marcosmarxm Jun 13, 2024
d9122cc
chore: auto-fix lint and format issues
octavia-squidington-iii Jun 13, 2024
396c0d6
tests
marcosmarxm Jun 14, 2024
e51ec56
fix imports
marcosmarxm Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
dockerRepository: airbyte/destination-azure-blob-storage
githubIssueLabel: destination-azure-blob-storage
icon: azureblobstorage.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,18 @@ protected void startTracked() throws Exception {

for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {

final String blobName = configuredStream.getStream().getName() + "/" +
getOutputFilename(new Timestamp(System.currentTimeMillis()));
StringBuilder blobNameSb = new StringBuilder()
.append(configuredStream.getStream().getName())
.append("/")
.append(getOutputFilename(new Timestamp(System.currentTimeMillis())));

if (azureBlobStorageDestinationConfig.getFormatConfig().isFileExtensionRequired()) {
blobNameSb
.append(".")
.append(azureBlobStorageDestinationConfig.getFormatConfig().getFormat().getFileExtension());
}
String blobName = blobNameSb.toString();

final AppendBlobClient appendBlobClient = specializedBlobClientBuilder
.blobName(blobName)
.buildAppendBlobClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

public interface AzureBlobStorageFormatConfig {

boolean isFileExtensionRequired();

AzureBlobStorageFormat getFormat();

static String withDefault(final JsonNode config, final String property, final String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static AzureBlobStorageFormatConfig getAzureBlobStorageFormatConfig(final
return new AzureBlobStorageCsvFormatConfig(formatConfig);
}
case JSONL -> {
return new AzureBlobStorageJsonlFormatConfig();
return new AzureBlobStorageJsonlFormatConfig(formatConfig);
}
default -> throw new RuntimeException("Unexpected output format: " + Jsons.serialize(config));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ public String getValue() {
}

private final Flattening flattening;
private final boolean fileExtensionRequired;

public AzureBlobStorageCsvFormatConfig(final JsonNode formatConfig) {
this.flattening = Flattening.fromValue(formatConfig.get("flattening").asText());
this.fileExtensionRequired = formatConfig.has("file_extension") ? formatConfig.get("file_extension").asBoolean() : false;
}

public boolean isFileExtensionRequired() {
return fileExtensionRequired;
}

@Override
Expand All @@ -58,6 +64,7 @@ public Flattening getFlattening() {
public String toString() {
return "AzureBlobStorageCsvFormatConfig{" +
"flattening=" + flattening +
", fileExtensionRequired=" + fileExtensionRequired +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,33 @@

package io.airbyte.integrations.destination.azure_blob_storage.jsonl;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormat;
import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormatConfig;

public class AzureBlobStorageJsonlFormatConfig implements AzureBlobStorageFormatConfig {

private final boolean fileExtensionRequired;

public AzureBlobStorageJsonlFormatConfig(final JsonNode formatConfig) {
this.fileExtensionRequired = formatConfig.has("file_extension") ? formatConfig.get("file_extension").asBoolean() : false;
}

@Override
public boolean isFileExtensionRequired() {
return fileExtensionRequired;
}

@Override
public AzureBlobStorageFormat getFormat() {
return AzureBlobStorageFormat.JSONL;
}

@Override
public String toString() {
return "AzureBlobStorageJsonlFormatConfig{" +
"fileExtensionRequired=" + fileExtensionRequired +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
"description": "Whether the input json data should be normalized (flattened) in the output CSV. Please refer to docs for details.",
"default": "No flattening",
"enum": ["No flattening", "Root level flattening"]
},
"file_extension": {
"title": "File Extension",
"type": "boolean",
"default": false,
"description": "Add file extensions to the output file."
}
}
},
Expand All @@ -88,6 +94,12 @@
"format_type": {
"type": "string",
"const": "JSONL"
},
"file_extension": {
"title": "File Extension",
"type": "boolean",
"default": false,
"description": "Add file extensions to the output file."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.azure_blob_storage.jsonl.AzureBlobStorageJsonlFormatConfig;
import io.airbyte.integrations.destination.azure_blob_storage.writer.ProductionWriterFactory;
Expand Down Expand Up @@ -43,6 +46,8 @@ public class AzureBlobStorageSpillTest {

private BlobContainerClient blobContainerClient;

private static final ObjectMapper mapper = MoreMappers.initMapper();

@BeforeEach
void setup() {
azureBlobStorageContainer = new AzureBlobStorageContainer().withExposedPorts(10000);
Expand Down Expand Up @@ -97,14 +102,19 @@ void testSpillBlobWithExceedingSize() throws Exception {
}

private static AzureBlobStorageDestinationConfig createConfig(String host, Integer mappedPort) {
final ObjectNode stubFormatConfig = mapper.createObjectNode();
stubFormatConfig.put("file_extension", Boolean.TRUE);
final ObjectNode stubConfig = mapper.createObjectNode();
stubConfig.set("format", stubFormatConfig);

return new AzureBlobStorageDestinationConfig(
"http://" + host + ":" + mappedPort + "/devstoreaccount1",
"devstoreaccount1",
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
"container-name",
1,
1,
new AzureBlobStorageJsonlFormatConfig());
new AzureBlobStorageJsonlFormatConfig(stubConfig));
}

private static AirbyteMessage createAirbyteMessage(JsonNode data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ void testConfigBlobStorageSpillSize() {

private JsonNode getFormatConfig() {
return Jsons.deserialize("{\n"
+ " \"format_type\": \"JSONL\"\n"
+ " \"format_type\": \"JSONL\",\n"
+ " \"file_extension\": \"true\"\n"
+ "}");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public void testGetCsvS3FormatConfig() {
final ObjectNode stubFormatConfig = mapper.createObjectNode();
stubFormatConfig.put("format_type", AzureBlobStorageFormat.CSV.toString());
stubFormatConfig.put("flattening", Flattening.ROOT_LEVEL.getValue());
stubFormatConfig.put("file_extension", Boolean.TRUE);

final ObjectNode stubConfig = mapper.createObjectNode();
stubConfig.set("format", stubFormatConfig);
Expand Down
37 changes: 22 additions & 15 deletions docs/integrations/destinations/azure-blob-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ With root level normalization, the output CSV is:
| :------------------------------------- | :-------------------- | :-------- | :----------------------------------- |
| `26d73cde-7eb1-4e1e-b7db-a4c03b4cf206` | 1622135805000 | 123 | `{ "first": "John", "last": "Doe" }` |

With the field `File Extension`, it is possible to save the output files with extension. It is an optional field with default value as `false`. Enable this to store the files with `csv` extension.

### JSON Lines \(JSONL\)

[Json Lines](https://jsonlines.org/) is a text format with one JSON per line. Each line has a structure as follows:
Expand Down Expand Up @@ -115,6 +117,8 @@ They will be like this in the output file:
{ "_airbyte_ab_id": "0a61de1b-9cdd-4455-a739-93572c9a5f20", "_airbyte_emitted_at": "1631948170000", "_airbyte_data": { "user_id": 456, "name": { "first": "Jane", "last": "Roe" } } }
```

With the field `File Extension`, it is possible to save the output files with extension. It is an optional field with default value as `false`. Enable this to store the files with `jsonl` extension.

## Getting started

### Requirements
Expand All @@ -124,28 +128,31 @@ They will be like this in the output file:

### Setup guide

- Fill up AzureBlobStorage info
- **Endpoint Domain Name**
- Leave default value \(or leave it empty if run container from command line\) to use Microsoft native one or use your own.
- **Azure blob storage container**
- If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp..
- **Azure Blob Storage account name**
- See [this](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) on how to create an account.
- **The Azure blob storage account key**
- Corresponding key to the above user.
- **Format**
- Data format that will be use for a migrated data representation in blob.
- Make sure your user has access to Azure from the machine running Airbyte.
- This depends on your networking setup.
- The easiest way to verify if Airbyte is able to connect to your Azure blob storage container is via the check connection tool in the UI.
* Fill up AzureBlobStorage info
* **Endpoint Domain Name**
* Leave default value \(or leave it empty if run container from command line\) to use Microsoft native one or use your own.
* **Azure blob storage container**
* If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp..
* **Azure Blob Storage account name**
* See [this](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) on how to create an account.
* **The Azure blob storage account key**
* Corresponding key to the above user.
* **Format**
* Data format that will be use for a migrated data representation in blob.
* With the field **File Extension**, it is possible to save the output files with extension. It is an optional field with default value as `false`. Enable this to store the files with extension.
* Make sure your user has access to Azure from the machine running Airbyte.
* This depends on your networking setup.
* The easiest way to verify if Airbyte is able to connect to your Azure blob storage container is via the check connection tool in the UI.


## Changelog

<details>
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.2 | 2024-06-12 | [\#38061](https://github.com/airbytehq/airbyte/pull/38061) | File Extensions added for the output files |
| 0.2.1 | 2023-09-13 | [\#30412](https://github.com/airbytehq/airbyte/pull/30412) | Switch noisy logging to debug |
| 0.2.0 | 2023-01-18 | [\#21467](https://github.com/airbytehq/airbyte/pull/21467) | Support spilling of objects exceeding configured size threshold |
| 0.1.6 | 2022-08-08 | [\#15318](https://github.com/airbytehq/airbyte/pull/15318) | Support per-stream state |
Expand Down
Loading