Skip to content

Commit

Permalink
🎉 Destination S3 Support writing timestamps (#7732)
Browse files Browse the repository at this point in the history
* get date-time format form json schema

* created universal date-time converter

* implemented jsonnode transformation for avro and parquet

* removed unneeded dependency from build.gradle

* fix checkstyle

* add DateTimeUtilsTest

* add AvroRecordHelperTest

* resolve merge conflicts | fix checkstyle

* update LocalTime parsing

* added String type to avro schema for Logical Types, removed date-time conversion

* fix checkstyle

* fix checkstyle

* added static String schema, added comments

* bump version

Co-authored-by: vmaltsev <[email protected]>
  • Loading branch information
VitaliiMaltsev and vmaltsev authored Nov 30, 2021
1 parent c97fe68 commit 065bbf6
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.13",
"dockerImageTag": "0.1.14",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3",
"icon": "s3.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
- name: SFTP-JSON
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class JsonToAvroSchemaConverter {
private static final Schema UUID_SCHEMA = LogicalTypes.uuid()
.addToSchema(Schema.create(Schema.Type.STRING));
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
private static final Logger LOGGER = LoggerFactory.getLogger(JsonToAvroSchemaConverter.class);
private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis()
.addToSchema(Schema.create(Schema.Type.LONG));
Expand Down Expand Up @@ -172,7 +173,20 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType

final Schema fieldSchema;
switch (fieldType) {
case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case STRING -> {
if (fieldDefinition.has("format")) {
String format = fieldDefinition.get("format").asText();
fieldSchema = switch (format) {
case "date-time" -> LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
case "date" -> LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
case "time" -> LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
default -> Schema.create(fieldType.getAvroType());
};
} else {
fieldSchema = Schema.create(fieldType.getAvroType());
}
}
case COMBINED -> {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
final List<Schema> unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get());
Expand Down Expand Up @@ -240,6 +254,14 @@ Schema getNullableFieldTypes(final String fieldName, final JsonNode fieldDefinit
if (!nonNullFieldTypes.contains(NULL_SCHEMA)) {
nonNullFieldTypes.add(0, NULL_SCHEMA);
}
// Logical types are converted to a union of logical type itself and string. The purpose is to
// default the logical type field to a string, if the value of the logical type field is invalid and
// cannot be properly processed.
if ((nonNullFieldTypes
.stream().anyMatch(schema -> schema.getLogicalType() != null)) &&
(!nonNullFieldTypes.contains(STRING_SCHEMA))) {
nonNullFieldTypes.add(STRING_SCHEMA);
}
return Schema.createUnion(nonNullFieldTypes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,12 @@
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"type": [
"null",
{ "type": "long", "logicalType": "timestamp-micros" },
"string",
"int"
],
"default": null
},
{
Expand Down Expand Up @@ -451,7 +456,12 @@
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"type": [
"null",
{ "type": "long", "logicalType": "timestamp-micros" },
"string",
"int"
],
"default": null
},
{
Expand All @@ -473,7 +483,7 @@
},
"avroObject": {
"user": {
"created_at": "1634982000",
"created_at": 1634982000,
"_airbyte_additional_properties": null
},
"_airbyte_additional_properties": null
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.14 | 2021-11-09 | [\#7732](https://github.com/airbytehq/airbyte/pull/7732) | Support timestamp in Avro and Parquet |
| 0.1.13 | 2021-11-03 | [\#7288](https://github.com/airbytehq/airbyte/issues/7288) | Support Json `additionalProperties`. |
| 0.1.12 | 2021-09-13 | [\#5720](https://github.com/airbytehq/airbyte/issues/5720) | Added configurable block size for stream. Each stream is limited to 10,000 by S3 |
| 0.1.11 | 2021-09-10 | [\#5729](https://github.com/airbytehq/airbyte/pull/5729) | For field names that start with a digit, a `_` will be appended at the beginning for the`Parquet` and `Avro` formats. |
Expand Down

0 comments on commit 065bbf6

Please sign in to comment.