From 745ea533c000f41880f0f38763354245a91cf9af Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Wed, 27 Oct 2021 16:44:04 +0300 Subject: [PATCH 01/10] BUG-6638: Added conversion for DATETIME format. --- .../BigQueryDenormalizedRecordConsumer.java | 12 +- .../BigQueryDenormalizedDestinationTest.java | 195 +++-------------- .../BigQueryDenormalizedTestDataUtils.java | 206 ++++++++++++++++++ .../bigquery/BigQueryRecordConsumer.java | 14 +- .../destination/bigquery/BigQueryUtils.java | 46 ++++ 5 files changed, 298 insertions(+), 175 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java index d52da1ffe77bb..68027fec9e05d 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java @@ -40,10 +40,10 @@ public class BigQueryDenormalizedRecordConsumer extends BigQueryRecordConsumer { private final Set invalidKeys; public BigQueryDenormalizedRecordConsumer(final BigQuery bigquery, - final Map writeConfigs, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector, - final StandardNameTransformer namingResolver) { + final Map writeConfigs, + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector, + final StandardNameTransformer namingResolver) { super(bigquery, writeConfigs, catalog, outputRecordCollector, false, false); this.namingResolver = namingResolver; invalidKeys = new HashSet<>(); @@ -59,6 +59,10 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage final ObjectNode data = (ObjectNode) formatData(schema.getFields(), recordMessage.getData()); data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString()); data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt); + List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(schema); + if (!dateTimeFields.isEmpty()) { + BigQueryUtils.transformJsonDateTimeToBigDataFormat(recordMessage, dateTimeFields, data); + } return data; } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index aa9026098504a..923aacf608f2c 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.bigquery; +import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -73,6 +74,10 @@ class BigQueryDenormalizedDestinationTest { .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) .withData(getDataWithFormats()) .withEmittedAt(NOW.toEpochMilli())); + private static final AirbyteMessage MESSAGE_USERS4 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) + .withData(getDataWithJSONDateTimeFormats()) + .withEmittedAt(NOW.toEpochMilli())); private JsonNode config; @@ -109,6 +114,7 @@ void setup(final TestInfo info) throws IOException { MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); MESSAGE_USERS3.getRecord().setNamespace(datasetId); + MESSAGE_USERS4.getRecord().setNamespace(datasetId); final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); dataset = bigquery.create(datasetInfo); @@ -199,7 +205,7 @@ void testWriteWithFormat() throws Exception { // Bigquery's datetime type accepts multiple input format but always outputs the same, so we can't // expect to receive the value we sent. - assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2018-08-19T12:11:35.220")); + assertEquals(extractJsonValues(resultJson, "updated_at"), Set.of("2021-10-11T06:36:53")); final Schema expectedSchema = Schema.of( Field.of("name", StandardSQLTypeName.STRING), @@ -211,6 +217,25 @@ void testWriteWithFormat() throws Exception { assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema); } + @Test + void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { + catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() + .withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithDateTime())) + .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + + final BigQueryDestination destination = new BigQueryDenormalizedDestination(); + final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); + + consumer.accept(MESSAGE_USERS4); + consumer.close(); + + final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); + assertEquals(usersActual.size(), 1); + final JsonNode resultJson = usersActual.get(0); + + assertEquals(Set.of("2021-10-11T09:36:53"), extractJsonValues(resultJson, "updated_at")); + } + private Set extractJsonValues(final JsonNode node, final String attributeName) { final List valuesNode = node.findValues(attributeName); final Set resultSet = new HashSet<>(); @@ -233,7 +258,6 @@ private List retrieveRecordsAsJson(final String tableName) throws Exce .newBuilder( String.format("select TO_JSON_STRING(t) as jsonValue from %s.%s t;", dataset.getDatasetId().getDataset(), tableName.toLowerCase())) .setUseLegacySql(false).build(); - BigQueryUtils.executeQuery(bigquery, queryConfig); return StreamSupport @@ -249,171 +273,4 @@ private static Stream schemaAndDataProvider() { arguments(getSchemaWithInvalidArrayType(), MESSAGE_USERS1), arguments(getSchema(), MESSAGE_USERS2)); } - - private static JsonNode getSchema() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"permissions\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"domain\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"grants\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + "}"); - - } - - private static JsonNode getSchemaWithFormats() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"date_of_birth\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date\"\n" - + " },\n" - + " \"updated_at\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date-time\"\n" - + " }\n" - + " }\n" - + "}"); - } - - private static JsonNode getSchemaWithInvalidArrayType() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"permissions\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"domain\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"grants\": {\n" - + " \"type\": [\n" - + " \"array\"\n" // missed "items" element - + " ]\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + "}"); - - } - - private static JsonNode getData() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"permissions\": [\n" - + " {\n" - + " \"domain\": \"abs\",\n" - + " \"grants\": [\n" - + " \"admin\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"domain\": \"tools\",\n" - + " \"grants\": [\n" - + " \"read\", \"write\"\n" - + " ]\n" - + " }\n" - + " ]\n" - + "}"); - } - - private static JsonNode getDataWithFormats() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"date_of_birth\": \"1996-01-25\",\n" - + " \"updated_at\": \"2018-08-19 12:11:35.22\"\n" - + "}"); - } - - private static JsonNode getDataWithEmptyObjectAndArray() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"permissions\": [\n" - + " {\n" - + " \"domain\": \"abs\",\n" - + " \"items\": {},\n" // empty object - + " \"grants\": [\n" - + " \"admin\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"domain\": \"tools\",\n" - + " \"grants\": [],\n" // empty array - + " \"items\": {\n" // object with empty array and object - + " \"object\": {},\n" - + " \"array\": []\n" - + " }\n" - + " }\n" - + " ]\n" - + "}"); - - } - } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java new file mode 100644 index 0000000000000..6d132b0ce914f --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -0,0 +1,206 @@ +package io.airbyte.integrations.destination.bigquery.util; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; + +public class BigQueryDenormalizedTestDataUtils { + + public static JsonNode getSchema() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"accepts_marketing_updated_at\": {\n" + + " \"type\": [\n" + + " \"null\",\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " },\n" + + " \"name\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"permissions\": {\n" + + " \"type\": [\n" + + " \"array\"\n" + + " ],\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"domain\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"grants\": {\n" + + " \"type\": [\n" + + " \"array\"\n" + + " ],\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"); + + } + + public static JsonNode getSchemaWithFormats() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"name\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"date_of_birth\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date\"\n" + + " },\n" + + " \"updated_at\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " }\n" + + " }\n" + + "}"); + } + + public static JsonNode getSchemaWithDateTime() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " " + + + "\"updated_at\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " }\n" + + " }\n" + + "}"); + } + + public static JsonNode getSchemaWithInvalidArrayType() { + return Jsons.deserialize( + "{\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"name\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"permissions\": {\n" + + " \"type\": [\n" + + " \"array\"\n" + + " ],\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"domain\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ]\n" + + " },\n" + + " \"grants\": {\n" + + " \"type\": [\n" + + " \"array\"\n" // missed "items" element + + " ]\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"); + + } + + public static JsonNode getData() { + return Jsons.deserialize( + "{\n" + + " \"name\": \"Andrii\",\n" + + " \"accepts_marketing_updated_at\": \"2021-10-11T06:36:53-07:00\",\n" + + " \"permissions\": [\n" + + " {\n" + + " \"domain\": \"abs\",\n" + + " \"grants\": [\n" + + " \"admin\"\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"domain\": \"tools\",\n" + + " \"grants\": [\n" + + " \"read\", \"write\"\n" + + " ]\n" + + " }\n" + + " ]\n" + + "}"); + } + + public static JsonNode getDataWithFormats() { + return Jsons.deserialize( + "{\n" + + " \"name\": \"Andrii\",\n" + + " \"date_of_birth\": \"1996-01-25\",\n" + + " \"updated_at\": \"2021-10-11T06:36:53\"\n" + + "}"); + } + + public static JsonNode getDataWithJSONDateTimeFormats() { + return Jsons.deserialize( + "{\n" + + " \"updated_at\": \"2021-10-11T06:36:53+00:00\"\n" + + "}"); + } + + public static JsonNode getDataWithEmptyObjectAndArray() { + return Jsons.deserialize( + "{\n" + + " \"name\": \"Andrii\",\n" + + " \"permissions\": [\n" + + " {\n" + + " \"domain\": \"abs\",\n" + + " \"items\": {},\n" // empty object + + " \"grants\": [\n" + + " \"admin\"\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"domain\": \"tools\",\n" + + " \"grants\": [],\n" // empty array + + " \"items\": {\n" // object with empty array and object + + " \"object\": {},\n" + + " \"array\": []\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"); + } +} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index ce725ab48484a..b69b00ab54175 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -10,6 +10,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.CopyJobConfiguration; @@ -23,6 +24,7 @@ import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableId; import com.google.common.base.Charsets; @@ -46,6 +48,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -54,6 +57,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +108,6 @@ public void acceptTracked(final AirbyteMessage message) throws IOException { Jsons.serialize(catalog), Jsons.serialize(recordMessage))); } final BigQueryWriteConfig writer = writeConfigs.get(pair); - // select the way of uploading - normal or through the GCS if (writer.getGcsCsvWriter() == null) { // Normal uploading way @@ -120,6 +123,8 @@ public void acceptTracked(final AirbyteMessage message) throws IOException { throw new RuntimeException(e); } } else { + long emittedAtSeconds = TimeUnit.SECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); + recordMessage.setEmittedAt(emittedAtSeconds); // GCS uploading way, this data will be moved to bigquery in close method final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter(); writeRecordToCsv(gcsCsvWriter, recordMessage); @@ -134,7 +139,11 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage // use BQ helpers to string-format correctly. final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); - final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData()); + final ObjectNode formattedData = (ObjectNode) StandardNameTransformer.formatJsonPath(recordMessage.getData()); + List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(schema); + if (!dateTimeFields.isEmpty()) { + BigQueryUtils.transformJsonDateTimeToBigDataFormat(recordMessage, dateTimeFields, formattedData); + } return Jsons.jsonNode(ImmutableMap.of( JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), JavaBaseConstants.COLUMN_NAME_DATA, Jsons.serialize(formattedData), @@ -158,6 +167,7 @@ protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRe } } + @Override public void close(final boolean hasFailed) { LOGGER.info("Started closing all connections"); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index d5fc8a397cb07..9decb2fe438f5 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -5,16 +5,20 @@ package io.airbyte.integrations.destination.bigquery; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Clustering; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; @@ -24,15 +28,20 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.UUID; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BigQueryUtils { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUtils.class); + private static final String BIG_QUERY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS"; static ImmutablePair executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) { final JobId jobId = JobId.of(UUID.randomUUID().toString()); @@ -143,4 +152,41 @@ static TableDefinition getTableDefinition(final BigQuery bigquery, final String return bigquery.getTable(tableId).getDefinition(); } + /** + * @param schema - schema of the database + * @return The list of fields with datetime format. + * + */ + public static List getDateTimeFieldsFromSchema(Schema schema) { + List dateTimeFields = new ArrayList<>(); + for (Field field : schema.getFields()) { + if (field.getType().getStandardType().equals(StandardSQLTypeName.DATETIME)) { + dateTimeFields.add(field.getName()); + } + } + return dateTimeFields; + } + + /** + * @param recordMessage - the record will be processed + * @param timeStampFields - list contains fields of DATETIME format + * @param data - Json will be sent to Google BigData service + * + * The special DATETIME format is required to save this type to BigQuery. + * @see Supported Google bigquery datatype + * This method is responsible to adapt JSON DATETIME to Bigquery + */ + public static void transformJsonDateTimeToBigDataFormat(AirbyteRecordMessage recordMessage, List timeStampFields, ObjectNode data) { + timeStampFields.forEach(e -> { + if (recordMessage.getData().findValue(e) != null && !recordMessage.getData().get(e).isNull()) { + String googleBigQueryDateFormat = QueryParameterValue + .dateTime(new DateTime(recordMessage.getData() + .findValue(e) + .asText()) + .toString(BIG_QUERY_DATETIME_FORMAT)) + .getValue(); + data.put(e, googleBigQueryDateFormat); + } + }); + } } From 74f3a98012075db61f6ab968a9189b040faabe70 Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Wed, 27 Oct 2021 18:05:07 +0300 Subject: [PATCH 02/10] BUG-6638: Added conversion for DATETIME format. --- .../destination/bigquery/BigQueryRecordConsumer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index b69b00ab54175..c605ccae5e8fb 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -123,8 +123,6 @@ public void acceptTracked(final AirbyteMessage message) throws IOException { throw new RuntimeException(e); } } else { - long emittedAtSeconds = TimeUnit.SECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); - recordMessage.setEmittedAt(emittedAtSeconds); // GCS uploading way, this data will be moved to bigquery in close method final GcsCsvWriter gcsCsvWriter = writer.getGcsCsvWriter(); writeRecordToCsv(gcsCsvWriter, recordMessage); From 9802b04eb985e202a62773677aa22ebaec9e0c7b Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Fri, 29 Oct 2021 10:53:50 +0300 Subject: [PATCH 03/10] BUG-6638: Added conversion for DATETIME format. --- .../bigquery/BigQueryDenormalizedDestinationTest.java | 2 +- .../bigquery/util/BigQueryDenormalizedTestDataUtils.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 923aacf608f2c..b0f32435313db 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -233,7 +233,7 @@ void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { assertEquals(usersActual.size(), 1); final JsonNode resultJson = usersActual.get(0); - assertEquals(Set.of("2021-10-11T09:36:53"), extractJsonValues(resultJson, "updated_at")); + assertEquals(Set.of("2021-10-11T08:36:53"), extractJsonValues(resultJson, "updated_at")); } private Set extractJsonValues(final JsonNode node, final String attributeName) { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 6d132b0ce914f..c50e44b7a1ab5 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -176,7 +176,7 @@ public static JsonNode getDataWithFormats() { public static JsonNode getDataWithJSONDateTimeFormats() { return Jsons.deserialize( "{\n" - + " \"updated_at\": \"2021-10-11T06:36:53+00:00\"\n" + + " \"updated_at\": \"2021-10-11T06:36:53+01:00\"\n" + "}"); } From e89b58773513a99ab7bb354e2864cb0c2c425270 Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Fri, 29 Oct 2021 11:29:19 +0300 Subject: [PATCH 04/10] BUG-6638: Added conversion for DATETIME format. --- .../bigquery/BigQueryDenormalizedDestinationTest.java | 5 ++++- .../bigquery/util/BigQueryDenormalizedTestDataUtils.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index b0f32435313db..77bafafb793a0 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -43,6 +43,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.joda.time.DateTime; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -233,7 +234,9 @@ void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { assertEquals(usersActual.size(), 1); final JsonNode resultJson = usersActual.get(0); - assertEquals(Set.of("2021-10-11T08:36:53"), extractJsonValues(resultJson, "updated_at")); + // BigQuery Accepts "YYYY-MM-DD HH:MM:SS[.SSSSSS]" format + // returns "yyyy-MM-dd'T'HH:mm:ss" format + assertEquals(Set.of(new DateTime("2021-10-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson, "updated_at")); } private Set extractJsonValues(final JsonNode node, final String attributeName) { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index c50e44b7a1ab5..6d132b0ce914f 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -176,7 +176,7 @@ public static JsonNode getDataWithFormats() { public static JsonNode getDataWithJSONDateTimeFormats() { return Jsons.deserialize( "{\n" - + " \"updated_at\": \"2021-10-11T06:36:53+01:00\"\n" + + " \"updated_at\": \"2021-10-11T06:36:53+00:00\"\n" + "}"); } From 12de917c82c74dde5ef0ae8a2620b5808fcae9e8 Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Fri, 29 Oct 2021 14:36:01 +0300 Subject: [PATCH 05/10] BUG-6638: Bumped versions and docs. --- .../079d5540-f236-4294-ba7c-ade8fd918496.json | 2 +- .../22f6c74f-5699-40ff-833c-4a879ea40133.json | 2 +- .../init/src/main/resources/seed/destination_definitions.yaml | 4 ++-- .../connectors/destination-bigquery-denormalized/Dockerfile | 2 +- .../connectors/destination-bigquery/Dockerfile | 2 +- docs/integrations/destinations/bigquery.md | 1 + 6 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json index ea1fd8ef6ef9b..2a948beb7b301 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/079d5540-f236-4294-ba7c-ade8fd918496.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496", "name": "BigQuery (denormalized typed struct)", "dockerRepository": "airbyte/destination-bigquery-denormalized", - "dockerImageTag": "0.1.6", + "dockerImageTag": "0.1.8", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json index 74c7f090c8f43..1b7cb29ae9d64 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133", "name": "BigQuery", "dockerRepository": "airbyte/destination-bigquery", - "dockerImageTag": "0.4.1", + "dockerImageTag": "0.4.2", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 8f37a86f8f06d..cbbcb9c32508b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -6,12 +6,12 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 0.5.0 + dockerImageTag: 0.5.1 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - name: Chargify (Keen) destinationDefinitionId: 81740ce8-d764-4ea7-94df-16bb41de36ae diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 2ad0b213627c2..ec6426734c09c 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index d507eed69ceb2..7782dd5887aa2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.5.0 +LABEL io.airbyte.version=0.5.1 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 3b691446003a4..2514faed547f6 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -155,6 +155,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.5.1 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery | | 0.5.0 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables | | 0.4.1 | 2021-10-04 | [\#6733](https://github.com/airbytehq/airbyte/issues/6733) | Support dataset starting with numbers | | 0.4.0 | 2021-08-26 | [\#5296](https://github.com/airbytehq/airbyte/issues/5296) | Added GCS Staging uploading option | From 53dcd5d43d0120bf2174f178366067559d33ed59 Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Fri, 29 Oct 2021 16:25:53 +0300 Subject: [PATCH 06/10] BUG-6638: bigquery.md --- docs/integrations/destinations/bigquery.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 2514faed547f6..ee1eeab3d43b2 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -170,6 +170,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.8 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery | | 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables | | 0.1.6 | 2021-09-16 | [\#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key | | 0.1.5 | 2021-09-07 | [\#5881](https://github.com/airbytehq/airbyte/pull/5881) | BigQuery Denormalized NPE fix | From 6511587481383f3d5ebda9bd74ffe78f918098ee Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Mon, 1 Nov 2021 12:15:37 +0200 Subject: [PATCH 07/10] BUG-6638: Removed datetime handling for BigQuery destination connector. --- .../22f6c74f-5699-40ff-833c-4a879ea40133.json | 2 +- .../main/resources/seed/destination_definitions.yaml | 2 +- .../connectors/destination-bigquery/Dockerfile | 2 +- .../destination/bigquery/BigQueryRecordConsumer.java | 12 ++---------- docs/integrations/destinations/bigquery.md | 1 - 5 files changed, 5 insertions(+), 14 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json index 1b7cb29ae9d64..74c7f090c8f43 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133", "name": "BigQuery", "dockerRepository": "airbyte/destination-bigquery", - "dockerImageTag": "0.4.2", + "dockerImageTag": "0.4.1", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index cbbcb9c32508b..ff89e7a3e60df 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -6,7 +6,7 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 0.5.1 + dockerImageTag: 0.5.0 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 7782dd5887aa2..d507eed69ceb2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.5.1 +LABEL io.airbyte.version=0.5.0 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java index c605ccae5e8fb..ce725ab48484a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java @@ -10,7 +10,6 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.CopyJobConfiguration; @@ -24,7 +23,6 @@ import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.QueryParameterValue; import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableId; import com.google.common.base.Charsets; @@ -48,7 +46,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -57,7 +54,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; -import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,6 +104,7 @@ public void acceptTracked(final AirbyteMessage message) throws IOException { Jsons.serialize(catalog), Jsons.serialize(recordMessage))); } final BigQueryWriteConfig writer = writeConfigs.get(pair); + // select the way of uploading - normal or through the GCS if (writer.getGcsCsvWriter() == null) { // Normal uploading way @@ -137,11 +134,7 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage // use BQ helpers to string-format correctly. final long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(recordMessage.getEmittedAt(), TimeUnit.MILLISECONDS); final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue(); - final ObjectNode formattedData = (ObjectNode) StandardNameTransformer.formatJsonPath(recordMessage.getData()); - List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(schema); - if (!dateTimeFields.isEmpty()) { - BigQueryUtils.transformJsonDateTimeToBigDataFormat(recordMessage, dateTimeFields, formattedData); - } + final JsonNode formattedData = StandardNameTransformer.formatJsonPath(recordMessage.getData()); return Jsons.jsonNode(ImmutableMap.of( JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), JavaBaseConstants.COLUMN_NAME_DATA, Jsons.serialize(formattedData), @@ -165,7 +158,6 @@ protected void writeRecordToCsv(final GcsCsvWriter gcsCsvWriter, final AirbyteRe } } - @Override public void close(final boolean hasFailed) { LOGGER.info("Started closing all connections"); diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index ee1eeab3d43b2..ede3b62ab2336 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -155,7 +155,6 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.5.1 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery | | 0.5.0 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables | | 0.4.1 | 2021-10-04 | [\#6733](https://github.com/airbytehq/airbyte/issues/6733) | Support dataset starting with numbers | | 0.4.0 | 2021-08-26 | [\#5296](https://github.com/airbytehq/airbyte/issues/5296) | Added GCS Staging uploading option | From 137c45dda82fd85e7704183796a8dbf522eb101c Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Mon, 1 Nov 2021 13:06:51 +0200 Subject: [PATCH 08/10] BUG-6638: Added BOOTSTRAP.md. --- .../destination-bigquery-denormalized/BOOTSTRAP.md | 5 +++++ .../connectors/destination-bigquery/BOOTSTRAP.md | 8 ++++++++ 2 files changed, 13 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-bigquery-denormalized/BOOTSTRAP.md create mode 100644 airbyte-integrations/connectors/destination-bigquery/BOOTSTRAP.md diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/BOOTSTRAP.md b/airbyte-integrations/connectors/destination-bigquery-denormalized/BOOTSTRAP.md new file mode 100644 index 0000000000000..edb26b327d2a6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/BOOTSTRAP.md @@ -0,0 +1,5 @@ +# BigQuery Denormalized Destination Connector Bootstrap + +Instead of splitting the final data into multiple tables, this destination leverages BigQuery capabilities with [Structured and Repeated fields](https://cloud.google.com/bigquery/docs/nested-repeated) to produce a single "big" table per stream. This does not write the `_airbyte_raw_*` tables in the destination and normalization from this connector is not supported at this time. + +See [this](https://docs.airbyte.io/integrations/destinations/databricks) link for the nuances about the connector. \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-bigquery/BOOTSTRAP.md b/airbyte-integrations/connectors/destination-bigquery/BOOTSTRAP.md new file mode 100644 index 0000000000000..9a5d31b122345 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/BOOTSTRAP.md @@ -0,0 +1,8 @@ +# BigQuery Destination Connector Bootstrap + +BigQuery is a serverless, highly scalable, and cost-effective data warehouse +offered by Google Cloud Provider. + +BigQuery connector is producing the standard Airbyte outputs using a `_airbyte_raw_*` tables storing the JSON blob data first. Afterward, these are transformed and normalized into separate tables, potentially "exploding" nested streams into their own tables if [basic normalization](https://docs.airbyte.io/understanding-airbyte/basic-normalization) is configured. + +See [this](https://docs.airbyte.io/integrations/destinations/bigquery) link for more information about the connector. From 49ae963c59f9f8dbe1f04e94a5b46d1253bb39e5 Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Wed, 3 Nov 2021 10:01:11 +0200 Subject: [PATCH 09/10] BUG-6638: Added handling of nested datetime. --- .../BigQueryDenormalizedRecordConsumer.java | 9 +++++---- .../BigQueryDenormalizedDestinationTest.java | 2 ++ .../BigQueryDenormalizedTestDataUtils.java | 20 ++++++++++++++++++- .../destination/bigquery/BigQueryUtils.java | 14 ++++++------- 4 files changed, 33 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java index 68027fec9e05d..9048dab2a3b65 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java @@ -59,10 +59,7 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage final ObjectNode data = (ObjectNode) formatData(schema.getFields(), recordMessage.getData()); data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString()); data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt); - List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(schema); - if (!dateTimeFields.isEmpty()) { - BigQueryUtils.transformJsonDateTimeToBigDataFormat(recordMessage, dateTimeFields, data); - } + return data; } @@ -71,6 +68,10 @@ protected JsonNode formatData(final FieldList fields, final JsonNode root) { if (fields == null) { return root; } + List dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields); + if (!dateTimeFields.isEmpty()) { + BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, (ObjectNode) root); + } if (root.isObject()) { final List fieldNames = fields.stream().map(Field::getName).collect(Collectors.toList()); return Jsons.jsonNode(Jsons.keys(root).stream() diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 77bafafb793a0..aa13e9fb02c49 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -237,6 +237,8 @@ void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception { // BigQuery Accepts "YYYY-MM-DD HH:MM:SS[.SSSSSS]" format // returns "yyyy-MM-dd'T'HH:mm:ss" format assertEquals(Set.of(new DateTime("2021-10-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson, "updated_at")); + //check nested datetime + assertEquals(Set.of(new DateTime("2021-11-11T06:36:53+00:00").toString("yyyy-MM-dd'T'HH:mm:ss")), extractJsonValues(resultJson.get("items"), "nested_datetime")); } private Set extractJsonValues(final JsonNode node, final String attributeName) { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 6d132b0ce914f..c2fa24cdec102 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -98,6 +98,21 @@ public static JsonNode getSchemaWithDateTime() { + " \"string\"\n" + " ],\n" + " \"format\": \"date-time\"\n" + + " },\n" + + " \"items\": {\n" + + " \"type\": [\n" + + " \"object\"\n" + + " ],\n" + + " \"properties\": {\n" + + " \"nested_datetime\": {\n" + + " \"type\": [\n" + + " \"string\"\n" + + " ],\n" + + " \"format\": \"date-time\"\n" + + " }\n" + + + " " + + "}\n" + " }\n" + " }\n" + "}"); @@ -176,7 +191,10 @@ public static JsonNode getDataWithFormats() { public static JsonNode getDataWithJSONDateTimeFormats() { return Jsons.deserialize( "{\n" - + " \"updated_at\": \"2021-10-11T06:36:53+00:00\"\n" + + " \"updated_at\": \"2021-10-11T06:36:53+00:00\",\n" + + " \"items\": {\n" + + " \"nested_datetime\": \"2021-11-11T06:36:53+00:00\"\n" + + " }\n" + "}"); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 9decb2fe438f5..842120e71a572 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -12,6 +12,7 @@ import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; @@ -153,13 +154,13 @@ static TableDefinition getTableDefinition(final BigQuery bigquery, final String } /** - * @param schema - schema of the database + * @param fieldList - the list to be checked * @return The list of fields with datetime format. * */ - public static List getDateTimeFieldsFromSchema(Schema schema) { + public static List getDateTimeFieldsFromSchema(FieldList fieldList) { List dateTimeFields = new ArrayList<>(); - for (Field field : schema.getFields()) { + for (Field field : fieldList) { if (field.getType().getStandardType().equals(StandardSQLTypeName.DATETIME)) { dateTimeFields.add(field.getName()); } @@ -168,7 +169,6 @@ public static List getDateTimeFieldsFromSchema(Schema schema) { } /** - * @param recordMessage - the record will be processed * @param timeStampFields - list contains fields of DATETIME format * @param data - Json will be sent to Google BigData service * @@ -176,11 +176,11 @@ public static List getDateTimeFieldsFromSchema(Schema schema) { * @see Supported Google bigquery datatype * This method is responsible to adapt JSON DATETIME to Bigquery */ - public static void transformJsonDateTimeToBigDataFormat(AirbyteRecordMessage recordMessage, List timeStampFields, ObjectNode data) { + public static void transformJsonDateTimeToBigDataFormat(List timeStampFields, ObjectNode data) { timeStampFields.forEach(e -> { - if (recordMessage.getData().findValue(e) != null && !recordMessage.getData().get(e).isNull()) { + if (data.findValue(e) != null && !data.get(e).isNull()) { String googleBigQueryDateFormat = QueryParameterValue - .dateTime(new DateTime(recordMessage.getData() + .dateTime(new DateTime(data .findValue(e) .asText()) .toString(BIG_QUERY_DATETIME_FORMAT)) From a0b017f5022e4479971aaf2a0904633f944035f2 Mon Sep 17 00:00:00 2001 From: Oleksandr Tsukanov Date: Wed, 3 Nov 2021 10:18:49 +0200 Subject: [PATCH 10/10] BUG-6638: renaming of argument list. --- .../integrations/destination/bigquery/BigQueryUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java index 842120e71a572..613ec652407ff 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java @@ -169,15 +169,15 @@ public static List getDateTimeFieldsFromSchema(FieldList fieldList) { } /** - * @param timeStampFields - list contains fields of DATETIME format + * @param dateTimeFields - list contains fields of DATETIME format * @param data - Json will be sent to Google BigData service * * The special DATETIME format is required to save this type to BigQuery. * @see Supported Google bigquery datatype * This method is responsible to adapt JSON DATETIME to Bigquery */ - public static void transformJsonDateTimeToBigDataFormat(List timeStampFields, ObjectNode data) { - timeStampFields.forEach(e -> { + public static void transformJsonDateTimeToBigDataFormat(List dateTimeFields, ObjectNode data) { + dateTimeFields.forEach(e -> { if (data.findValue(e) != null && !data.get(e).isNull()) { String googleBigQueryDateFormat = QueryParameterValue .dateTime(new DateTime(data