Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination S3 Support writing timestamps #7732

Merged
merged 17 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,20 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType

final Schema fieldSchema;
switch (fieldType) {
case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case STRING -> {
if (fieldDefinition.has("format")) {
String format = fieldDefinition.get("format").asText();
fieldSchema = switch (format) {
case "date-time" -> LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
case "date" -> LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
case "time" -> LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
default -> Schema.create(fieldType.getAvroType());
};
} else {
fieldSchema = Schema.create(fieldType.getAvroType());
}
}
case COMBINED -> {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
final List<Schema> unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.writer.BaseS3Writer;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
Expand Down Expand Up @@ -66,6 +68,11 @@ public S3AvroWriter(final S3DestinationConfig config,

@Override
public void write(final UUID id, final AirbyteRecordMessage recordMessage) throws IOException {
JsonNode jsonSchema = getStream().getJsonSchema();
JsonNode recordMessageData = recordMessage.getData();
AvroRecordHelper.transformDateTimeInJson(jsonSchema, recordMessageData);
recordMessage.setData(recordMessageData);

dataFileWriter.append(avroRecordFactory.getAvroRecord(id, recordMessage));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.integrations.destination.s3.parquet;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.destination.s3.writer.BaseS3Writer;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -105,6 +107,10 @@ public String getOutputFilename() {

@Override
public void write(final UUID id, final AirbyteRecordMessage recordMessage) throws IOException {
JsonNode jsonSchema = getStream().getJsonSchema();
JsonNode recordMessageData = recordMessage.getData();
AvroRecordHelper.transformDateTimeInJson(jsonSchema, recordMessageData);
recordMessage.setData(recordMessageData);
parquetWriter.write(avroRecordFactory.getAvroRecord(id, recordMessage));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
package io.airbyte.integrations.destination.s3.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ValueNode;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import java.util.*;
import java.util.stream.Collectors;

/**
* Helper methods for unit tests. This is needed by multiple modules, so it is in the src directory.
Expand Down Expand Up @@ -45,4 +51,98 @@ public static JsonNode pruneAirbyteJson(final JsonNode input) {
return output;
}

/**
* Create all possible paths (jsonPointers) for json record or json schema
*/
private static void addKeys(String currentPath, JsonNode jsonNode, Map<String, String> map, boolean jsonSchema) {
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
String pathPrefix = currentPath.isEmpty() ? "" : currentPath + "/";

while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
addKeys(pathPrefix + entry.getKey(), entry.getValue(), map, jsonSchema);
}
} else if (jsonNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) jsonNode;

for (int i = 0; i < arrayNode.size(); i++) {
String arrayPath = currentPath + "/" + i;
addKeys(arrayPath, arrayNode.get(i), map, jsonSchema);
}

} else if (jsonNode.isValueNode()) {
ValueNode valueNode = (ValueNode) jsonNode;
if (jsonSchema) {
if (schemaContainsProperties(currentPath, valueNode, "format", List.of("date", "date-time", "time"))) {
map.put("/" + currentPath, valueNode.asText());
}
} else {
String value = valueNode.asText();
if (!value.equals("null") && !value.isBlank() && !Boolean.parseBoolean(value)) {
map.put("/" + currentPath, value);
}
}
}
}

private static boolean schemaContainsProperties(String currentPath, ValueNode valueNode, String key, List<String> properties) {
return currentPath.endsWith(key) && properties.contains(valueNode.asText());
}

public static void transformDateTimeInJson(JsonNode jsonSchema, JsonNode recordMessageData) {
Map<String, String> map = new HashMap<>();
Map<String, String> schemaMap = new HashMap<>();
addKeys("", recordMessageData, map, false);
addKeys("", jsonSchema, schemaMap, true);
convertDateTimeInRecord(recordMessageData, map, schemaMap);
}

private static void convertDateTimeInRecord(JsonNode recordMessageData, Map<String, String> jsonRecordMap, Map<String, String> schemaMap) {
schemaMap.forEach((schemaKey, format) -> {
List<String> schemaKeys = Arrays.stream(schemaKey.split("/"))
.filter(key -> !key.isBlank())
.filter(key -> !key.equals("items"))
.filter(key -> !key.equals("properties"))
.filter(key -> !key.equals("format"))
.collect(Collectors.toList());
jsonRecordMap.forEach((jsonPath, jsonValue) -> {
List<String> jsonKeys = Arrays.stream(jsonPath.split("/"))
.filter(key -> !key.isBlank())
.filter(key -> !key.matches("-?\\d+"))
.collect(Collectors.toList());
// match path of json record and json schema
if (schemaKeys.equals(jsonKeys)) {
int delimiter = jsonPath.lastIndexOf("/");
String jsonPointer = jsonPath.substring(0, delimiter);
String fieldName = jsonPath.substring(delimiter + 1);
// numeric fieldName is valid only for ArrayNode
if (fieldName.matches("-?\\d+")) {
((ArrayNode) recordMessageData.at(jsonPointer)).set(Integer.parseInt(fieldName),
getNodeForModification(format, jsonRecordMap.get(jsonPath)));
} else {
((ObjectNode) recordMessageData.at(jsonPointer)).put(fieldName,
getNodeForModification(format, jsonRecordMap.get(jsonPath)));
}
}
});
});
}

private static JsonNode getNodeForModification(String format, String dateTime) {
switch (format) {
case "date" -> {
return new IntNode(DateTimeUtils.getEpochDay(dateTime));
}
case "date-time" -> {
return new LongNode(DateTimeUtils.getEpochMillis(dateTime));
}
case "time" -> {
return new LongNode(DateTimeUtils.getMicroSeconds(dateTime));
}
}
throw new RuntimeException("Failed to find necessary date-time format");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3.util;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DateTimeUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(DateTimeUtils.class);

private static final DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" +
"[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]");
private static final DateTimeFormatter timeFormatter =
DateTimeFormatter.ofPattern("HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS]]");

public static Long getEpochMillis(String dateTime) {
Instant instant = null;
if (dateTime.matches("-?\\d+")) {
return Long.valueOf(dateTime);
}
try {
ZonedDateTime zdt = ZonedDateTime.parse(dateTime, formatter);
instant = zdt.toInstant();
} catch (DateTimeParseException e) {
try {
LocalDateTime dt = LocalDateTime.parse(dateTime, formatter);
instant = dt.toInstant(ZoneOffset.UTC);
} catch (DateTimeParseException ex) {
LOGGER.error("Failed to parse date-time :" + dateTime);
}
}
return Objects.requireNonNull(instant).toEpochMilli();
}

public static Integer getEpochDay(String dateTime) {
Integer epochDay = null;
try {
LocalDate date = LocalDate.parse(dateTime, formatter);
epochDay = (int) date.toEpochDay();
} catch (DateTimeParseException e) {
LOGGER.error("Failed to parse date :" + dateTime);
}
return Objects.requireNonNull(epochDay);
}

public static Long getMicroSeconds(String dateTime) {
Long secondOfDay = null;
if (dateTime.matches("-?\\d+")) {
return Long.valueOf(dateTime);
}
try {
LocalTime time = LocalTime.parse(dateTime, timeFormatter);
secondOfDay = time.toNanoOfDay();
} catch (DateTimeParseException e) {
try {
LocalTime time = LocalTime.parse(dateTime,formatter);
secondOfDay = time.toNanoOfDay();
}catch (DateTimeParseException ex){
LOGGER.error("Failed to parse time :" + dateTime);
}
}
return Objects.requireNonNull(secondOfDay) / 1000;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ protected BaseS3Writer(final S3DestinationConfig config,
this.outputPrefix = S3OutputPathHelper.getOutputPrefix(config.getBucketPath(), stream);
}

public AirbyteStream getStream() {
return stream;
}

public String getOutputPrefix() {
return outputPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3.util;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import java.io.IOException;
import org.junit.jupiter.api.Test;

public class AvroRecordHelperTest {

@Test
public void testModifyJsonRecordWithCorrectDateTime() throws IOException {

final JsonNode jsonSchema = Jsons.deserialize(MoreResources.readResource("avro/jsonSchemaWithManyNestedObjects.json"));
final JsonNode jsonWithStringDateTime = Jsons.deserialize(MoreResources.readResource("avro/dateTimeString.json"));
final JsonNode jsonWithAvroDateTime = Jsons.deserialize(MoreResources.readResource("avro/dateTimeModified.json"));
JsonNode jsonNode = Jsons.clone(jsonWithStringDateTime);
AvroRecordHelper.transformDateTimeInJson(jsonSchema, jsonNode);
assertEquals(jsonWithAvroDateTime, jsonNode);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3.util;

import static io.airbyte.integrations.destination.s3.util.DateTimeUtils.*;
import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

public class DateTimeUtilsTest {

@Test
public void testDateTimeConversion() {

assertEquals(1537012800000L, getEpochMillis("2018-09-15 12:00:00"));
assertEquals(1537012800006L, getEpochMillis("2018-09-15 12:00:00.006542"));
assertEquals(1537012800000L, getEpochMillis("2018/09/15 12:00:00"));
assertEquals(1537012800000L, getEpochMillis("2018.09.15 12:00:00"));
assertEquals(1531656000000L, getEpochMillis("2018 Jul 15 12:00:00"));
assertEquals(1531627200000L, getEpochMillis("2018 Jul 15 12:00:00 GMT+08:00"));
assertEquals(1531630800000L, getEpochMillis("2018 Jul 15 12:00:00GMT+07"));
assertEquals(1609462861000L, getEpochMillis("2021-1-1 01:01:01"));
assertEquals(1609462861000L, getEpochMillis("2021.1.1 01:01:01"));
assertEquals(1609462861000L, getEpochMillis("2021/1/1 01:01:01"));
assertEquals(1609459261000L, getEpochMillis("2021-1-1 01:01:01 +01"));
assertEquals(1609459261000L, getEpochMillis("2021-01-01T01:01:01+01:00"));
assertEquals(1609459261546L, getEpochMillis("2021-01-01T01:01:01.546+01:00"));
assertEquals(1609462861000L, getEpochMillis("2021-01-01 01:01:01"));
assertEquals(1609462861000L, getEpochMillis("2021-01-01 01:01:01 +0000"));
assertEquals(1609462861000L, getEpochMillis("2021/01/01 01:01:01 +0000"));
assertEquals(1609462861000L, getEpochMillis("2021-01-01T01:01:01Z"));
assertEquals(1609466461000L, getEpochMillis("2021-01-01T01:01:01-01:00"));
assertEquals(1609459261000L, getEpochMillis("2021-01-01T01:01:01+01:00"));
assertEquals(1609462861000L, getEpochMillis("2021-01-01 01:01:01 UTC"));
assertEquals(1609491661000L, getEpochMillis("2021-01-01T01:01:01 PST"));
assertEquals(1609462861000L, getEpochMillis("2021-01-01T01:01:01 +0000"));
assertEquals(1609462861000L, getEpochMillis("2021-01-01T01:01:01+0000"));
assertEquals(1609462861000L, getEpochMillis("2021-01-01T01:01:01UTC"));
assertEquals(1609459261000L, getEpochMillis("2021-01-01T01:01:01+01"));

assertEquals(18628, getEpochDay("2021-1-1"));
assertEquals(18628, getEpochDay("2021-01-01"));
assertEquals(18629, getEpochDay("2021/01/02"));
assertEquals(18630, getEpochDay("2021.01.03"));
assertEquals(18631, getEpochDay("2021 Jan 04"));

assertEquals(3661000000L, getMicroSeconds("01:01:01"));
assertEquals(3660000000L, getMicroSeconds("01:01"));
assertEquals(44581541000L, getMicroSeconds("12:23:01.541"));
assertEquals(44581541214L, getMicroSeconds("12:23:01.541214"));
}

}
Loading