Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Configuration row arguments may get misplaced between Python SchemaTransformPayload encoding and Java RowCoder decoding #25669

Closed
1 of 15 tasks
ahmedabu98 opened this issue Feb 28, 2023 · 4 comments · Fixed by #25685

Comments

@ahmedabu98
Copy link
Contributor

What happened?

Was testing a SchemaTransform Python wrapper (#25521) and found that I had to have a right ordering of kwargs for the input arguments to reach the Java transform in the right fields. This is weird because the ordering of kwargs should have no impact.

For example, where self._table="my_project:my_dataset.xlang_table",

the following works fine:

external_storage_write = SchemaAwareExternalTransform(
    identifier=self.schematransform_config.identifier,
    expansion_service=self._expansion_service,
    createDisposition=self._create_disposition,
    writeDisposition=self._write_disposition,     #<---
    triggeringFrequencySeconds=self._triggering_frequency,
    useAtLeastOnceSemantics=self._use_at_least_once,
    table=self._table)                            #<---

and I get a configuration object in Java transform that looks like this:

BigQueryStorageWriteApiSchemaTransformConfiguration{
  table=my_project:my_dataset.xlang_table, 
  createDisposition=, 
  writeDisposition=, 
  triggeringFrequencySeconds=0, 
  useAtLeastOnceSemantics=false}

However, if I change the kwargs to look like this (switch places of table and writeDisposition):

external_storage_write = SchemaAwareExternalTransform(
    identifier=self.schematransform_config.identifier,
    expansion_service=self._expansion_service,
    createDisposition=self._create_disposition,
    table=self._table,                            #<---
    triggeringFrequencySeconds=self._triggering_frequency,
    useAtLeastOnceSemantics=self._use_at_least_once,
    writeDisposition=self._write_disposition)     #<---

I get the following configuration object. Notice the value intended for table is now in the writeDisposition field.

BigQueryStorageWriteApiSchemaTransformConfiguration{
  table=, 
  createDisposition=, 
  writeDisposition=my_project:my_dataset.xlang_table, 
  triggeringFrequencySeconds=0, 
  useAtLeastOnceSemantics=false

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@chamikaramj chamikaramj self-assigned this Mar 1, 2023
@ahmedabu98 ahmedabu98 changed the title [Bug]: Different orderings of SchemaAwareExternalTransform() kwargs may result in misplaced arguments [Bug]: Configuration row arguments may get misplaced between Python SchemaTransformPayload encoding and Java RowCoder decoding Mar 24, 2023
@ahmedabu98
Copy link
Contributor Author

I followed this unittest and tried encoding my configuration with SchemaTransformPayloadBuilder() and decoding with proto_utils.parse_Bytes(). It returned a configuration with fields in the right order.

Not sure what the mismatch is between how Python and Java handle the payload.

@ahmedabu98
Copy link
Contributor Author

ahmedabu98 commented Mar 26, 2023

Update: we found that this problem was due to line 342 in the Schema::sorted method below, which copies the field encoding positions of the original unsorted Schema over to the new sorted Schema. The resulting mismatch between encoding positions and field indices in the sorted Schema leads to this issue. When RowCoder tries to encode/decode a field, it looks for the encoding position of that field. Here we see RowCoder using the encoding positions of a pre-sorted Schema to try decoding a Row with a sorted Schema, leading to what looks like field misplacement.

public Schema sorted() {
// Create a new schema and copy over the appropriate Schema object attributes:
// {fields, uuid, encodingPositions, options}
Schema sortedSchema =
this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.collect(Schema.toSchema())
.withOptions(getOptions());
sortedSchema.setUUID(getUUID());
sortedSchema.setEncodingPositions(getEncodingPositions());
return sortedSchema;
}

Context: TypedSchemaTransformProvider uses a class to represent the configuration of the SchemaTransform. The configuration schema is inferred from its class using AutoValueSchema. Unfortunately, the ordering of fields in the generated configuration schema is not guaranteed, so as a workaround we are sorting the fields alphabetically.

@Abacn
Copy link
Contributor

Abacn commented Apr 28, 2023

Observed similar issues in #26480. The fix of #25685 appears to be generic to schema but the same still happens for SchemaIO wrapper (used by jdbc xlang).

@chamikaramj
Copy link
Contributor

chamikaramj commented May 3, 2023

jdbc.py has a similar issue but that's not addressed here. transforms in jdbc.py needs to preserve ordering since it uses a config object [1] along with (old) SchemaIO API. This config object is encoded in the Python side and decoded in the Java side. So ordering of the the objects in the two sides have to match.

One way to fix this will be to update jdbc.py to use schema-aware transforms (which will pick up the fix provided in this PR).

[1]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants