Skip to content

Commit

Permalink
Format and generate seed resources. (#8493)
Browse files Browse the repository at this point in the history
  • Loading branch information
davinchia authored Dec 3, 2021
1 parent 341f505 commit bc230ee
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3109,7 +3109,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.3.17"
- dockerImage: "airbyte/destination-snowflake:0.3.18"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -3182,19 +3182,19 @@
description: "Loading method used to send data to Snowflake."
order: 7
oneOf:
- title: "Standard Inserts"
- title: "[Recommended] Internal Staging"
additionalProperties: false
description: "Uses <pre>INSERT</pre> statements to send batches of records\
\ to Snowflake. Easiest (no setup) but not recommended for large production\
\ workloads due to slow speed."
description: "Writes large batches of records to a file, uploads the file\
\ to Snowflake, then uses <pre>COPY INTO table</pre> to upload the file.\
\ Recommended for large production workloads for better speed and scalability."
required:
- "method"
properties:
method:
type: "string"
enum:
- "Standard"
default: "Standard"
- "Internal Staging"
default: "Internal Staging"
- title: "AWS S3 Staging"
additionalProperties: false
description: "Writes large batches of records to a file, uploads the file\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ public void close(final boolean hasFailed) {
fieldsWithRefDefinition.clear();
super.close(hasFailed);
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.strategy;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -36,11 +40,11 @@ public class BigQueryDenormalizedUploadStandardStrategy extends BigQueryUploadSt
private final Set<String> fieldsWithRefDefinition;

public BigQueryDenormalizedUploadStandardStrategy(BigQuery bigquery,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector,
StandardNameTransformer namingResolver,
Set<String> invalidKeys,
Set<String> fieldsWithRefDefinition) {
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector,
StandardNameTransformer namingResolver,
Set<String> invalidKeys,
Set<String> fieldsWithRefDefinition) {
super(bigquery, catalog, outputRecordCollector);
this.namingResolver = namingResolver;
this.invalidKeys = invalidKeys;
Expand Down Expand Up @@ -121,4 +125,5 @@ private JsonNode getObjectNode(FieldList fields, JsonNode root) {
.collect(Collectors.toMap(namingResolver::getIdentifier,
key -> formatData(fields.get(namingResolver.getIdentifier(key)).getSubFields(), root.get(key)))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.cloud.bigquery.BigQuery;
import io.airbyte.commons.bytes.ByteUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
Expand All @@ -24,7 +22,6 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -48,15 +45,14 @@ public class BigQueryRecordConsumer extends FailureTrackingAirbyteMessageConsume

private AirbyteMessage lastStateMessage = null;


protected final Map<UploadingMethod, BigQueryUploadStrategy> bigQueryUploadStrategyMap = new ConcurrentHashMap<>();

public BigQueryRecordConsumer(final BigQuery bigquery,
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final boolean isGcsUploadingMode,
final boolean isKeepFilesInGcs) {
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final boolean isGcsUploadingMode,
final boolean isKeepFilesInGcs) {
this.bigquery = bigquery;
this.writeConfigs = writeConfigs;
this.catalog = catalog;
Expand Down Expand Up @@ -93,7 +89,6 @@ private void processRecord(AirbyteMessage message) {
}
}


@Override
public void close(final boolean hasFailed) {
LOGGER.info("Started closing all connections");
Expand Down Expand Up @@ -139,4 +134,5 @@ private void deleteDataFromGcsBucket() {
s3Client.shutdown();
});
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.helpers;

import java.lang.management.ManagementFactory;
Expand All @@ -9,8 +13,7 @@ public class LoggerHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggerHelper.class);

private LoggerHelper() {
}
private LoggerHelper() {}

public static void printHeapMemoryConsumption() {
final int mb = 1024 * 1024;
Expand All @@ -20,4 +23,5 @@ public static void printHeapMemoryConsumption() {
LOGGER.info("Initial Memory (xms) mb = {}", xms);
LOGGER.info("Max Memory (xmx) : mb = {}", xmx);
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.strategy;

import static com.amazonaws.util.StringUtils.UTF8;
Expand Down Expand Up @@ -137,4 +141,5 @@ private void loadCsvFromGcsTruncate(final BigQueryWriteConfig bigQueryWriteConfi
throw new RuntimeException("Column not added during load append \n" + e.toString());
}
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.strategy;

import static io.airbyte.integrations.destination.bigquery.helpers.LoggerHelper.printHeapMemoryConsumption;
Expand Down Expand Up @@ -46,8 +50,9 @@ public class BigQueryUploadStandardStrategy implements BigQueryUploadStrategy {
private final ConfiguredAirbyteCatalog catalog;
private final Consumer<AirbyteMessage> outputRecordCollector;

public BigQueryUploadStandardStrategy(BigQuery bigquery, ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector) {
public BigQueryUploadStandardStrategy(BigQuery bigquery,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector) {
this.bigquery = bigquery;
this.catalog = catalog;
this.outputRecordCollector = outputRecordCollector;
Expand Down Expand Up @@ -137,8 +142,8 @@ public void close(List<BigQueryWriteConfig> writeConfigList, boolean hasFailed,
}

private void partitionIfUnpartitioned(final BigQueryWriteConfig bigQueryWriteConfig,
final BigQuery bigquery,
final TableId destinationTableId) {
final BigQuery bigquery,
final TableId destinationTableId) {
try {
final QueryJobConfiguration queryConfig = QueryJobConfiguration
.newBuilder(
Expand Down Expand Up @@ -181,10 +186,10 @@ private void partitionIfUnpartitioned(final BigQueryWriteConfig bigQueryWriteCon

// https://cloud.google.com/bigquery/docs/managing-tables#copying_a_single_source_table
private static void copyTable(
final BigQuery bigquery,
final TableId sourceTableId,
final TableId destinationTableId,
final WriteDisposition syncMode) {
final BigQuery bigquery,
final TableId sourceTableId,
final TableId destinationTableId,
final WriteDisposition syncMode) {
final CopyJobConfiguration configuration = CopyJobConfiguration.newBuilder(destinationTableId, sourceTableId)
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(syncMode)
Expand All @@ -200,19 +205,20 @@ private static void copyTable(
}

protected String getCreatePartitionedTableFromSelectQuery(final Schema schema,
final String projectId,
final TableId destinationTableId,
final String tmpPartitionTable) {
final String projectId,
final TableId destinationTableId,
final String tmpPartitionTable) {
return String.format("create table `%s.%s.%s` (", projectId, destinationTableId.getDataset(), tmpPartitionTable)
+ schema.getFields().stream()
.map(field -> String.format("%s %s", field.getName(), field.getType()))
.collect(Collectors.joining(", "))
.map(field -> String.format("%s %s", field.getName(), field.getType()))
.collect(Collectors.joining(", "))
+ ") partition by date("
+ JavaBaseConstants.COLUMN_NAME_EMITTED_AT
+ ") as select "
+ schema.getFields().stream()
.map(Field::getName)
.collect(Collectors.joining(", "))
.map(Field::getName)
.collect(Collectors.joining(", "))
+ String.format(" from `%s.%s.%s`", projectId, destinationTableId.getDataset(), destinationTableId.getTable());
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.strategy;

import io.airbyte.integrations.destination.bigquery.BigQueryWriteConfig;
Expand All @@ -10,4 +14,5 @@ public interface BigQueryUploadStrategy {
void upload(BigQueryWriteConfig writer, AirbyteMessage airbyteMessage, ConfiguredAirbyteCatalog catalog);

void close(List<BigQueryWriteConfig> writeConfigList, boolean hasFailed, AirbyteMessage lastStateMessage);

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
* Snowflake Internal Staging consists of 4 main parts
*
* CREATE STAGE @TEMP_STAGE_NAME -- Creates a new named internal stage to use for loading data from
* files into Snowflake tables and unloading data from tables into files
* PUT file://local/<file-patterns> @TEMP_STAGE_NAME. --JDBC Driver will upload the files into stage
* files into Snowflake tables and unloading data from tables into files PUT
* file://local/<file-patterns> @TEMP_STAGE_NAME. --JDBC Driver will upload the files into stage
* COPY FROM @TEMP_STAGE_NAME -- Loads data from staged files to an existing table.
* DROP @TEMP_STAGE_NAME -- Drop temporary stage after sync
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ protected void initTests() {
.sourceType("pg_lsn")
.airbyteType(JsonSchemaPrimitive.STRING)
.addInsertValues("'7/A25801C8'::pg_lsn", "'0/0'::pg_lsn", "null")
.addExpectedValues("7/A25801C8", "0/0",null)
.addExpectedValues("7/A25801C8", "0/0", null)
.build());

// The numeric type in Postres may contain 'Nan' type, but in JdbcUtils-> rowToJson
Expand Down

0 comments on commit bc230ee

Please sign in to comment.