diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json index 3d761889c720..3cea3a60802a 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362", "name": "S3", "dockerRepository": "airbyte/destination-s3", - "dockerImageTag": "0.1.13", + "dockerImageTag": "0.1.14", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3", "icon": "s3.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 39e0fe8a2c1c..9f36eecb7cfd 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -139,7 +139,7 @@ - name: S3 destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.1.13 + dockerImageTag: 0.1.14 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 icon: s3.svg - name: SFTP-JSON diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index aab1915a9fbd..d5ae917a3a32 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.13 +LABEL io.airbyte.version=0.1.14 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index 9460eeb152be..e723beacc1c8 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -40,6 +40,7 @@ public class JsonToAvroSchemaConverter { private static final Schema UUID_SCHEMA = LogicalTypes.uuid() .addToSchema(Schema.create(Schema.Type.STRING)); private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); + private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); private static final Logger LOGGER = LoggerFactory.getLogger(JsonToAvroSchemaConverter.class); private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis() .addToSchema(Schema.create(Schema.Type.LONG)); @@ -172,7 +173,20 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType final Schema fieldSchema; switch (fieldType) { - case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType()); + case NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType()); + case STRING -> { + if (fieldDefinition.has("format")) { + String format = fieldDefinition.get("format").asText(); + fieldSchema = switch (format) { + case "date-time" -> LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + case "date" -> LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + case "time" -> LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)); + default -> Schema.create(fieldType.getAvroType()); + }; + } else { + fieldSchema = Schema.create(fieldType.getAvroType()); + } + } case COMBINED -> { final Optional combinedRestriction = getCombinedRestriction(fieldDefinition); final List unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get()); @@ -240,6 +254,14 @@ Schema getNullableFieldTypes(final String fieldName, final JsonNode fieldDefinit if (!nonNullFieldTypes.contains(NULL_SCHEMA)) { nonNullFieldTypes.add(0, NULL_SCHEMA); } + // Logical types are converted to a union of logical type itself and string. The purpose is to + // default the logical type field to a string, if the value of the logical type field is invalid and + // cannot be properly processed. + if ((nonNullFieldTypes + .stream().anyMatch(schema -> schema.getLogicalType() != null)) && + (!nonNullFieldTypes.contains(STRING_SCHEMA))) { + nonNullFieldTypes.add(STRING_SCHEMA); + } return Schema.createUnion(nonNullFieldTypes); } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json index c611474f080f..b027dae755a4 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json @@ -388,7 +388,12 @@ "fields": [ { "name": "created_at", - "type": ["null", "string", "int"], + "type": [ + "null", + { "type": "long", "logicalType": "timestamp-micros" }, + "string", + "int" + ], "default": null }, { @@ -451,7 +456,12 @@ "fields": [ { "name": "created_at", - "type": ["null", "string", "int"], + "type": [ + "null", + { "type": "long", "logicalType": "timestamp-micros" }, + "string", + "int" + ], "default": null }, { @@ -473,7 +483,7 @@ }, "avroObject": { "user": { - "created_at": "1634982000", + "created_at": 1634982000, "_airbyte_additional_properties": null }, "_airbyte_additional_properties": null diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index d85654362606..cf5d9dbec13c 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -224,6 +224,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.14 | 2021-11-09 | [\#7732](https://github.com/airbytehq/airbyte/pull/7732) | Support timestamp in Avro and Parquet | | 0.1.13 | 2021-11-03 | [\#7288](https://github.com/airbytehq/airbyte/issues/7288) | Support Json `additionalProperties`. | | 0.1.12 | 2021-09-13 | [\#5720](https://github.com/airbytehq/airbyte/issues/5720) | Added configurable block size for stream. Each stream is limited to 10,000 by S3 | | 0.1.11 | 2021-09-10 | [\#5729](https://github.com/airbytehq/airbyte/pull/5729) | For field names that start with a digit, a `_` will be appended at the beginning for the`Parquet` and `Avro` formats. |