Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-246: Improved PROTOBUF_NATIVE schema compatibility checks without using avro-protobuf #19565

Open
Denovo1998 opened this issue Feb 19, 2023 · 1 comment

Comments

@Denovo1998
Copy link
Contributor

Denovo1998 commented Feb 19, 2023

Motivation

Pulsar has built-in support for typed messages. It allows specifying an encoding scheme and its matching schema. Everytime you produce a message, you first declare the schema definition you wish to use for your messages. Each time you specify a schema to be used, either by a producer or a consumer, the schema is persisted in Pulsar and given an increasing version number. If the schema was the same as the previous version, it is not saved. When the message is persisted, the version number is encoded in the message headers.

But to meet new business requirements, patterns need to evolve over time with version control.

So pulsar provides a very useful feature named Schema Evolution.
https://pulsar.apache.org/docs/next/schema-understand/#schema-evolution

It allows us to check if a new schema version is compatible with previous versions or versions. When you configure the schema for the topic you decide the strategy to use for doing the validation check. The strategies validate the following:

  • BACKWARD
    • A consumer with newSchema can read a message written using existingSchema.
  • BACKWARD_TRANSITIVE
    • A consumer with newSchema can read messages written using all existingSchema.
  • FORWARD
    • A consumer with existingSchema can read messages written using newSchema.
  • FORWARD_TRANSITIVE
    • A consumer defined with any of the existingSchema can read messages written using newSchema.
  • FULL
    • A consumer defined with newSchema can read messages written using existingSchema.
    • A consumer defined with existingSchema can read messages written using newSchema.
  • FULL_TRANSITIVE
    • A consumer defined with newSchema can read messages written using any of the existingSchema.
    • A consumer defined with any of the existingSchema can read messages written using newSchema.

Aside from Avro, Pulsar also has two additional supported encodings: PROTOBUF and PROTOBUF_NATIVE.

(1) PROTOBUF encodes the messages using Protobuf encoding, but the schema that is persisted to Pulsar is not Protobuf Descriptor as you would have expected. The saved schema is a translation of the Protobuf Descriptor to an Avro Schema, so in fact an Avro schema definition is saved as the schema.(Because the avro has avro-protobuf, it can convert protobuf to avro schema and do compatibility checks)

(2) PROTOBUF_NATIVE was created to fix that shortcoming, by actually persisting the Protobuf Descriptor and using Protobuf for encoding. And it was designed so that not use avro-protobuf for protobuf schema compatibility checking.
The root message name is the class name we pass in when we create the producer or consumer. ProtoBuf has many nested messages or dependencies. But the current implementation(ProtobufNativeSchemaCompatibilityCheck) only checks if the passed class name is the same. It does not check if the fields in the file change in a way that is compatible with older versions of the schema.

Goal

  1. The goal of this PIP is to amend PROTOBUF_NATIVE by adding a more advanced, complete, but not redundant custom validation to any defined schema-compatibility-strategies.
  2. The second goal is to allow users to choose between different implementations of PROTOBUF_NATIVE schema compatibility check. The new protobuf compatibility base check or the existing PROTOBUF_NATIVE compatibility check.

API Changes

  1. Add ProtobufNativeSchemaValidator, used as a validator to check compatibility of two Protobufs. The validator has only one function(canRead) to check the compatibility of the two protobuf.
public interface ProtobufNativeSchemaValidator {

    void canRead(Descriptors.Descriptor writtenSchema, Descriptors.Descriptor readSchema)
            throws ProtoBufCanReadCheckException;

}

Implementation

  1. In the website(https://pulsar.apache.org/docs/next/schema-understand/#schema-compatibility-check) when the schema for PROTOBUF_NATIVE default schema-compatibility-check is org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck. The PIP we implemented a more comprehensive ProtobufNativeSchemaAdvanceCompatibilityCheck compatibility check rules.

  2. Through the description of schema-compatibility-check-strategy on the pulsar website. https://pulsar.apache.org/docs/next/schema-understand/#schema-compatibility-check-strategy

When we create a topic, we configure a schema-compatibility-check-strategy. This strategy dictates what to check exactly when a new schema is registered. Sometimes it is the consumer who registers the new schema, and sometimes it is the producer registering the new schema. One thing is always correct: Messages are written using a certain schema(version), and later read using a different schema (version). So the two parameters in canRead we set to writtenSchema and readSchema, and the implementation of this function is whether writtenSchema can be read by readSchema.

The checks those strategies dictate can be summarized as follows:

  • BACKWARD: We verify that newSchema can read existingSchema. So the existingSchema is writtenSchema, and newSchema is readSchema.
  • BACKWARD_TRANSITIVE: We verify that newSchema can read all of existingSchema.
  • FORWARD: We verify that existingSchema can read newSchema. So the newSchema is writtenSchema, and existingSchema is readSchema.
  • FORWARD_TRANSITIVE: We verify that all of existingSchema can read newSchema.
  • FULL: We verify that newSchema can read existingSchema, and existingSchema also can read newSchema.
  • FULL_TRANSITIVE: We verify that newSchema can read all of existingSchema, and all of existingSchema also can read newSchema.
    switch (strategy) {
        case BACKWARD_TRANSITIVE -> {
            for (Descriptor existingSchema : existingSchemasList) {
                protobufNativeSchemaBreakValidator.canRead(existingSchema, newSchema);
            }
        }
        case BACKWARD -> {
            if (existingSchemas.hasNext()) {
                Descriptor existingSchema = existingSchemas.next();
                protobufNativeSchemaBreakValidator.canRead(existingSchema, newSchema);
            }
        }
        case FORWARD_TRANSITIVE -> {
            for (Descriptor existingSchema : existingSchemasList) {
                protobufNativeSchemaBreakValidator.canRead(newSchema, existingSchema);
            }
        }
        case FORWARD -> {
            if (existingSchemas.hasNext()) {
                Descriptor existingSchema = existingSchemas.next();
                protobufNativeSchemaBreakValidator.canRead(newSchema, existingSchema);
            }
        }
        case FULL_TRANSITIVE -> {
            for (Descriptor existingSchema : existingSchemasList) {
                protobufNativeSchemaBreakValidator.canRead(existingSchema, newSchema);
                protobufNativeSchemaBreakValidator.canRead(newSchema, existingSchema);
            }
        }
        case FULL -> {
            if (existingSchemas.hasNext()) {
                Descriptor existingSchema = existingSchemas.next();
                protobufNativeSchemaBreakValidator.canRead(existingSchema, newSchema);
                protobufNativeSchemaBreakValidator.canRead(newSchema, existingSchema);
            }
        }
        case ALWAYS_COMPATIBLE -> {
            return;
        }
        default -> throw new ProtoBufCanReadCheckException("Unknown SchemaCompatibilityStrategy.");
    }
  1. We define our own basic compatibility rules in the canRead check, and the implementation of canRead has different compatibility checks for different versions of protobuf(proto2,proto3). In canRead check, we only realize whether writtenSchema can be read by readSchema, without considering different schema-compatibility-check-strategy.

According to the protobuf official website https://protobuf.dev/programming-guides/proto/#updating for compatibility. Here are the basic compatibility rules we've defined:

(1) If the root message names of writtenSchema and readSchema are different, then incompatible.
This rule applies to both proto2 and proto3.

(2) No changes are allowed to the required fields.

  • The writtenSchema cannot add required fields, but optional or duplicate fields can be added (The field number must be new).
  • The writtenSchema cannot remove required fields in the readSchema.
    This rule applies to proto2. However, proto3 removes required. If you use proto3, it will not check for changes to the required field

(3) The writtenSchema can not change the field number of any field in readSchema (the field name is the same, but the field number is different).
This rule applies to both proto2 and proto3.

(4) The writtenSchema does not change the field name and number, but it does change the field type.

log.warn("The field type for a field with number {} has been changed.", readSchemaFieldInfo.getNumber());

The Alternatives to this PIP issue describe compatibility between field types recorded on protobuf official website. But we should not add its implementation to this design, as discussed in the discussion:

If a number is parsed from the wire which doesn’t fit in the corresponding type, you will get the same effect as if you had cast the number to that type in C++ (for example, if a 64-bit number is read as an int32, it will be truncated to 32 bits).
I personally wouldn’t rely on such compatibility guarantees in a real application.
If my check amount (> int32 lol) would be truncated because someone changed the field type in a schema, I would be quite upset.
This rule applies to both proto2 and proto3.

(5) There can not be a field which exists both in readSchema and writtenSchema, with same field number, having different default values.(https://protobuf.dev/programming-guides/dos-donts/#dont-change-the-default-value-of-a-field)
This rule applies to proto2. But in proto3, the default value cannot be set by yourself! If you use proto3, it will not check for this rule

(6) There can't be a field in writtenSchema, that exists in readSchema (same field), which in writtenSchema is repeated and its type is Scalar (https://protobuf.dev/programming-guides/proto/#scalar) but in readSchema it is not repeated anymore.
This rule applies to proto2. But in proto3, the required has been removed. If you use proto3, it will not check for this rule with required field. But repeated and optional will still be checked

(7) There can't be a field in writtenSchema, that exists in readSchema (same field), which in writtenSchema its type is scalar(packed=true). But in readSchema its type is repeated(packed=false).
This rule applies to both proto2 and proto3.

(8) When writtenSchema or readSchema is "proto3 and proto2" or "proto2 and proto3":
Different versions of protobuf are not compatible, will throw an exception like this:

throw new ProtoBufCanReadCheckException("Protobuf syntax have been changed.");

Alternatives

When a field type is changed. According to the protobuf official website https://protobuf.dev/programming-guides/proto/#updating for compatibility. The rules for compatibility checking are as follows:

  • int32, uint32, int64, uint64, and bool are all compatible – this means you can change a field from one of these types to another without breaking forwards- or backwards-compatibility.
  • sint32 and sint64 are compatible with each other but are not compatible with the other integer types.
  • string and bytes are compatible as long as the bytes are valid UTF-8.
  • Embedded messages are compatible with bytes if the bytes contain an encoded version of the message.
  • fixed32 is compatible with sfixed32, and fixed64 with sfixed64.
  • enum is compatible with int32, uint32, int64, and uint64 in terms of wire format (note that values will be truncated if they don’t fit).

Anything else?

Links

Discussion: https://lists.apache.org/thread/c59qqzcf77w7gm9tq7thdmg0lt3qf5w8
Vote:
PR: #19566

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant