Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination S3 Support writing timestamps #7732

Merged
merged 17 commits into from
Nov 30, 2021

Conversation

VitaliiMaltsev
Copy link
Contributor

@VitaliiMaltsev VitaliiMaltsev commented Nov 8, 2021

What

Support writing timestamps for S3 in Avro and Parquet formats

How

According avro documentation https://avro.apache.org/docs/current/spec.html#Date.
Date

A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar).

Time (microsecond precision)

A time-micros logical type annotates an Avro long, where the long stores the number of microseconds after midnight, 00:00:00.000000.

Timestamp (microsecond precision)

A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds from the unix epoch, 1 January 1970 00:00:00.000000 UTC.

avro1
avro2

Recommended reading order

  1. x.java
  2. y.python

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the new connector version is published, connector version bumped in the seed directory as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here

Connector Generator

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


vmaltsev seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@github-actions github-actions bot added the area/connectors Connector related issues label Nov 8, 2021
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 8, 2021 11:15 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 8, 2021 11:18 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 8, 2021 14:20 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 8, 2021 14:56 Inactive
vmaltsev added 2 commits November 8, 2021 16:58
# Conflicts:
#	airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java
#	airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/parquet/S3ParquetWriter.java
@github-actions github-actions bot added the area/documentation Improvements or additions to documentation label Nov 8, 2021
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 8, 2021 15:10 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 8, 2021 16:05 Inactive
@VitaliiMaltsev VitaliiMaltsev changed the title Vmaltsev/4610 destination s3 write timestamps Destination S3 Support writing timestamps Nov 9, 2021
@VitaliiMaltsev VitaliiMaltsev changed the title Destination S3 Support writing timestamps 🎉 Destination S3 Support writing timestamps Nov 9, 2021
@VitaliiMaltsev VitaliiMaltsev linked an issue Nov 9, 2021 that may be closed by this pull request
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 11, 2021 09:31 Inactive
@VitaliiMaltsev VitaliiMaltsev marked this pull request as ready for review November 11, 2021 10:18
Copy link
Contributor

@tuliren tuliren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the delay.

I spent quite some time on this, and the main concern I have for the current implementation is that we need to iterate through the Json object and modify it.

I think a better approach is to 1) keep the Json -> Avro schema converter change in this PR, 2) update the Json -> Avro object converter in https://github.com/airbytehq/json-avro-converter. Here is probably the entry point.

In this way, the Json object does not need to be mutated, and it is only iterated through once (in the Json -> Avro object converter). It also makes support more formats easier in the future, because we don't need to maintain the mutation methods in AvroRecordHelper.java. Instead, the object converter already has a pretty solid iteration framework that we can extend.

Let me know what you think.

@VitaliiMaltsev
Copy link
Contributor Author

@VitaliiMaltsev, I have added you as a maintainer of that repo. By the way, when you create PR, please change the base of the PR to airbytehq/json-avro-converter (highlighted in blue): Screen Shot 2021-11-15 at 09 06 16

Otherwise the PR will be created on the original repo.

created PR airbytehq/json-avro-converter#9. Please review

@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 18, 2021 10:39 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 18, 2021 11:29 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 18, 2021 12:48 Inactive
Copy link
Contributor

@tuliren tuliren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.

Can you also update the doc here to include how the logical types you added here are processed?

if ((nonNullFieldTypes
.stream().anyMatch(schema -> schema.getLogicalType() != null)) &&
(!nonNullFieldTypes.contains(Schema.create(Schema.Type.STRING)))) {
nonNullFieldTypes.addLast(Schema.create(Schema.Type.STRING));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the nonNullFieldTypes from the generic List to LinkedList does not seem necessary. The add method on the List interface is equivalent to addLast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -240,6 +254,11 @@ Schema getNullableFieldTypes(final String fieldName, final JsonNode fieldDefinit
if (!nonNullFieldTypes.contains(NULL_SCHEMA)) {
nonNullFieldTypes.add(0, NULL_SCHEMA);
}
if ((nonNullFieldTypes
.stream().anyMatch(schema -> schema.getLogicalType() != null)) &&
(!nonNullFieldTypes.contains(Schema.create(Schema.Type.STRING)))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to save Schema.create(Schema.Type.STRING)) as a static private variable to avoid creating the string schema repeatedly. This if block does it twice: one in the check, and one to be appended to the field type list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -240,6 +254,11 @@ Schema getNullableFieldTypes(final String fieldName, final JsonNode fieldDefinit
if (!nonNullFieldTypes.contains(NULL_SCHEMA)) {
nonNullFieldTypes.add(0, NULL_SCHEMA);
}
if ((nonNullFieldTypes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comments before this if block to explain why the string schema is added at the end?

Something like:

Logical types are converted to a union of logical type itself and string. The purpose is to default the logical type field to a string, if the value of the logical type field is invalid and cannot be properly processed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 30, 2021 09:14 Inactive
@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented Nov 30, 2021

/publish connector=connectors/destination-s3

🕑 connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/1520165378
❌ connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/1520165378

@jrhizor jrhizor temporarily deployed to more-secrets November 30, 2021 09:37 Inactive
@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 30, 2021 09:50 Inactive
# Conflicts:
#	airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json
#	airbyte-integrations/connectors/source-mongodb-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/mongodb/MongodbSourceStrictEncryptAcceptanceTest.java
@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented Nov 30, 2021

/publish connector=connectors/destination-s3

🕑 connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/1520234304
✅ connectors/destination-s3 https://github.com/airbytehq/airbyte/actions/runs/1520234304

@VitaliiMaltsev VitaliiMaltsev temporarily deployed to more-secrets November 30, 2021 09:53 Inactive
@jrhizor jrhizor temporarily deployed to more-secrets November 30, 2021 09:54 Inactive
@VitaliiMaltsev VitaliiMaltsev merged commit 065bbf6 into master Nov 30, 2021
@VitaliiMaltsev VitaliiMaltsev deleted the vmaltsev/4610-destination-s3-write-timestamps branch November 30, 2021 10:08
@tuliren
Copy link
Contributor

tuliren commented Nov 30, 2021

Follow ups:

  • Update the Avro schema converter doc about the new logical type support.
  • Bump and publish the GCS destination connector to pick up this change.

@VitaliiMaltsev
Copy link
Contributor Author

VitaliiMaltsev commented Dec 1, 2021

Follow ups:

  • Update the Avro schema converter doc about the new logical type support.
  • Bump and publish the GCS destination connector to pick up this change.

@tuliren bumped GCS version and updated documentation #8360
Please review

schlattk pushed a commit to schlattk/airbyte that referenced this pull request Jan 4, 2022
* get date-time format form json schema

* created universal date-time converter

* implemented jsonnode transformation for avro and parquet

* removed unneeded dependency from build.gradle

* fix checkstyle

* add DateTimeUtilsTest

* add AvroRecordHelperTest

* resolve merge conflicts | fix checkstyle

* update LocalTime parsing

* added String type to avro schema for Logical Types, removed date-time conversion

* fix checkstyle

* fix checkstyle

* added static String schema, added comments

* bump version

Co-authored-by: vmaltsev <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Destination S3: Support writing timestamps
6 participants