Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use external config schema to construct Python SchemaTransform payload #26100

Merged
merged 5 commits into from
Apr 19, 2023

Conversation

ahmedabu98
Copy link
Contributor

Development on external SchemaTransforms may lead to changes in how their configurations look (ie. adding and removing fields, changing the order of fields). In addition to that, currently most Java SchemaTransformProviders inherit from TypedSchemaTransformProvider, which infers its configuration schema by using a configuration class and AutoValueSchema. While this approach is very convenient, the ordering of fields in the inferred schema is unfortunately not consistent. All of this is to say that the configuration schema of external transforms is prone to changes.

When we use an external SchemaTransform in Python, we build a payload that includes the configuration fields. These are the same fields used to set up the external SchemaTransform. Currently, we only use the input kwargs to construct the payload, so we are blind to what the external configuration schema actually is. The changes in this PR make it so that we first fetch the external configuration schema then construct the payload in accordance to that schema.

@ahmedabu98
Copy link
Contributor Author

R: @chamikaramj

@github-actions github-actions bot added the python label Apr 4, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Apr 4, 2023

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@codecov
Copy link

codecov bot commented Apr 4, 2023

Codecov Report

Merging #26100 (1174c22) into master (619dcfa) will increase coverage by 0.58%.
The diff coverage is 80.95%.

@@            Coverage Diff             @@
##           master   #26100      +/-   ##
==========================================
+ Coverage   71.41%   72.00%   +0.58%     
==========================================
  Files         782      748      -34     
  Lines      102856   101109    -1747     
==========================================
- Hits        73457    72805     -652     
+ Misses      27922    26827    -1095     
  Partials     1477     1477              
Flag Coverage Δ
python 81.06% <80.95%> (+1.10%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/io/gcp/bigquery.py 69.79% <ø> (-0.03%) ⬇️
sdks/python/apache_beam/transforms/external.py 81.12% <80.95%> (+0.60%) ⬆️

... and 52 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@ahmedabu98
Copy link
Contributor Author

Run Python_Xlang_Gcp_Direct PostCommit

@ahmedabu98
Copy link
Contributor Author

Run Python_Xlang_Gcp_Dataflow PostCommit

self._kwargs = kwargs

def _get_schema_proto_and_payload(self, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you also want to check that there are no kwargs beyond those in the schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good idea, will add that check

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@@ -180,14 +180,52 @@ def _get_named_tuple_instance(self):


class SchemaTransformPayloadBuilder(PayloadBuilder):
def __init__(self, identifier, **kwargs):
self._identifier = identifier
def __init__(self, schematransform_config, strict_schema=False, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to use SchemaTransforms without the full config or the schema (i.e. just using the schema transform ID and a set of kwargs). Can you adjust the change so that the additional validation is optional ?

self._kwargs = kwargs

def _get_schema_proto_and_payload(self, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the additional checks before this call and continue to use the existing external._get_schema_proto_and_payload() method ?

"SchemaTransform's configuration fields: %s" %
(kwargs_fields, external_config_schema_fields))

# The discover API allows us to obtain an ordered configuration schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of the "strict_schema" option, we should do a "rearrange_based_on_discovery" option. If the option is not provided, we use kwargs as is without the overhead of the additional RPC (this will work for anything other than TypedSchemaTransformProvider). For TypedSchemaTransformProvider, we would set the "rearrange_based_on_discovery" option to true and would rearrange kwargs based on a discovery call before the "_get_schema_proto_and_payload" invocation. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I like keeping the less costly option as the default. So the default can continue using the existing _get_schema_proto_and_payload(). However, I think we can't use this method for the rearrange_based_on_discovery option because it builds the payload based off kwargs only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rearrange kwargs before the method call and use the same method, can't we ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that probably works actually, will try that

@ahmedabu98
Copy link
Contributor Author

Run Python_Xlang_Gcp_Direct PostCommit

@ahmedabu98
Copy link
Contributor Author

Run Portable_Python PreCommit

@ahmedabu98
Copy link
Contributor Author

ahmedabu98 commented Apr 6, 2023

beam_PreCommit_Portable_Python_Commit failing with:

Project 'py${lowestSupported}' not found in project ':sdks:python:test-suites:portable'

Cause by #26121 (comment), fix in #26151

@ahmedabu98
Copy link
Contributor Author

R: @chamikaramj

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Just one comment.

sdks/python/apache_beam/transforms/external.py Outdated Show resolved Hide resolved
@ahmedabu98 ahmedabu98 requested a review from chamikaramj April 18, 2023 16:53
Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants