Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[schemaregistry]ProtobufNative Schema Support #8372

Merged
merged 12 commits into from
Nov 17, 2020

Conversation

hnail
Copy link
Contributor

@hnail hnail commented Oct 26, 2020

Fixes #7642
Fixes #7674

Motivation

Protobuf Native Schema Support : PIP-Add ProtobufNative Schema Support

This PR proposes to import a new protobuf-v3 schema based on protobuf native Descriptor, Current ProtobufSchema is based on AVRO schema, this Causes the following restrictions :

  • Current ProtobufSchema based on avro-protobuf only works when JVM classloader can load Java implementation class of GeneratedMessageV3 . this is unfriendly for long-running server ( e.g. Presto ), restart server for update class is unfriendly.
  • Describe protobuf schema by avro schema will cause losses of information, so based current ProtobufSchema can’t support AutoConsume by DynamicMessage.
  • The amount of support avro language is less than support protobuf language.

In consideration of backward compatibility , we add a new schema type SchemaType.PROTOBUF_NATIVE base on protobuf-v3 native Descriptor instead of modify SchemaType.PROTOBUF, aim to support GenericProtobufNativeSchema and AutoConsumeSchema for PROTOBUF.

Modifications

Describe in PIP-Add ProtobufNative Schema Support


Usage Example

Create ProtobufNative schema Example
SchemaDefinition def =  SchemaDefinition.<PBMessage>builder()
                .withAlwaysAllowNull(true)
                .withPojo(PBMessage.class)
                .build();
Schema schema = Schema.PROTOBUFNATIVE(def);
admin.schemas().createSchema(topic, schema.getSchemaInfo());
Class-Based schema consumer Example
Consumer<PBMessage> consumer = client.newConsumer(Schema.PROTOBUFNATIVE(PBMessage.class)).topic(topic)
                .subscriptionName("my-subscription-name").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        while (!consumer.hasReachedEndOfTopic()){
            Message<PBMessage> msg = consumer.receive();
            PBMessage pBMessage =  msg.getValue();
            System.out.println("getPlatform : "+ pBMessage.getPlatform());
        }
ProtobufNative AUTO_CONSUME Example
 Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME()).topic(topic)
                .subscriptionName("my-subscription-nameaaa").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscribe();
        while (!consumer.hasReachedEndOfTopic()){
            Message<GenericRecord> msg = consumer.receive() ;
           // System.out.println("GenericRecord :"+msg.getValue().toString());
            GenericRecord genericRecord =  msg.getValue();
            System.out.println(genericRecord.getField("platform"));
            GenericProtobufNativeRecord genericProtobufNativeRecord = (GenericProtobufNativeRecord)genericRecord;
            DynamicMessage dynamicMessage = genericProtobufNativeRecord.getProtobufRecord();         
            String platform = dynamicMessage.getField(dynamicMessage.getDescriptorForType().findFieldByName("platform")).toString();
            System.out.println("platform : "+ platform);
        }

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: ( yes )
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

@hnail hnail changed the title [WIP]ProtobufNative Schema Support ProtobufNative Schema Support Oct 27, 2020
@hnail hnail changed the title ProtobufNative Schema Support [schemaregistry]ProtobufNative Schema Support Oct 27, 2020
@sijie sijie added this to the 2.7.0 milestone Oct 27, 2020
@sijie
Copy link
Member

sijie commented Oct 27, 2020

@hnail Can you send the PIP of adding native protobuf support to dev@ mailing list? So the community can review the PIP and we can add the PIP to Pulsar's wiki page.

@hnail
Copy link
Contributor Author

hnail commented Oct 28, 2020

@hnail Can you send the PIP of adding native protobuf support to dev@ mailing list? So the community can review the PIP and we can add the PIP to Pulsar's wiki page.

send <[DISCUSS] PIP-Add ProtobufNative Schema Support> to [email protected] done

@hnail
Copy link
Contributor Author

hnail commented Oct 28, 2020

/pulsarbot run-failure-checks

@hnail
Copy link
Contributor Author

hnail commented Nov 9, 2020

@sijie @codelipenghui Has there been progresses on this review ?

@sijie
Copy link
Member

sijie commented Nov 11, 2020

@hnail overall the change looks really good. I made a few comments. PTAL.

Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, just left some minor comments.

@codelipenghui
Copy link
Contributor

@congbobo184 Could you please help review this PR?

@hnail
Copy link
Contributor Author

hnail commented Nov 13, 2020

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit c01b1ee into apache:master Nov 17, 2020
flowchartsman pushed a commit to flowchartsman/pulsar that referenced this pull request Nov 17, 2020
Fixes apache#7642
Fixes apache#7674 

### Motivation
Protobuf  Native Schema Support :  [PIP-Add ProtobufNative Schema Support ](https://docs.google.com/document/d/1XR_MNOuSXyig-CKsdVhr6IXvFwziBRdSoS3oEUiLFe8/edit?usp=sharing)

This PR proposes to import a new protobuf-v3 schema based on protobuf native `Descriptor`, Current  `ProtobufSchema` is based on AVRO schema,  this Causes the following restrictions :

- Current [ProtobufSchema](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java) based on avro-protobuf only works when JVM classloader can load `Java implementation class of GeneratedMessageV3` . this is unfriendly for long-running server ( e.g. Presto ), restart server for update class is unfriendly.
- Describe `protobuf` schema by `avro` schema will cause losses of information, so based current [ProtobufSchema](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java) can’t support `AutoConsume` by `DynamicMessage`.
- The amount of support avro language is less than support protobuf language.

In consideration of backward compatibility , we add a new schema type  `SchemaType.PROTOBUF_NATIVE` base on protobuf-v3 native Descriptor instead of modify SchemaType.PROTOBUF, aim to support `GenericProtobufNativeSchema` and `AutoConsumeSchema` for PROTOBUF.

### Modifications
Describe in [PIP-Add ProtobufNative Schema Support ](https://docs.google.com/document/d/1XR_MNOuSXyig-CKsdVhr6IXvFwziBRdSoS3oEUiLFe8/edit?usp=sharing)
@hnail hnail deleted the support_protobuf_native_schema branch December 1, 2020 11:19
sijie pushed a commit that referenced this pull request Dec 30, 2020
jiazhai pushed a commit that referenced this pull request Feb 1, 2021
Fixes #4747 
Fixes #7652 

### Motivation

PIP-71: https://github.com/apache/pulsar/wiki/PIP-71:-Pulsar-SQL-migrate-SchemaHandle-to-presto-decoder

**Pip-Doc** : [[PIP-71][SQL]Migrate SchemaHandle to Presto-decoder](https://docs.google.com/document/d/1KwG0GoHccju4-QNPfvT6tOwhp5Fvs6-iZlfLooPxTDM/edit?usp=sharing)

In current version , pulsar-presto deserialize fields  rely on SchemaHandler , but this causes the following restrictions :

- **Metadata**: current nested field is dissociate with presto ParameterizedType , It treated nested field as a separated field , so  presto compiler can’t understand the type hierarchy . nested field should be Row type in presto (e.g.  Hive struct type support) . In the same way,array \ map type also shoud associate with presto ParameterizedTypes.
- **Decoder** : SchemaHandler is hard to work with  `RecordCursor.getObject()` to support ROW,MAP,ARRAY .etc

The **motivations** of this pull request :
-  ` PulsarMetadata` take advantage of `ParameterizedType`  to describe `row/array/map` Type instead of resolve nested columns in pulsar-presto connecter.
- Customize `RowDecoder | RowDecoderFactory | ColumnDecoder` to work with pulsar interface, and with some our own extensions  compare to presto original version , we can support more type for backward compatible (e.g. 
 ` TIMESTAMP\DATE\TIME\Real\ARRAY\MAP\ROW ` support).
- Decouple avro or schema type with `pulsar-presto main module` (RecordSet,ConnectorMetadata .etc ), aim to friendly with other schema type ( [ProtobufNative](#8372)  、thrift etc..).

### Modifications

Describe in [PIP-71: Pulsar SQL migrate SchemaHandle to presto decoder](https://docs.google.com/document/d/1KwG0GoHccju4-QNPfvT6tOwhp5Fvs6-iZlfLooPxTDM/edit?usp=sharing) 

----

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (**yes** )
  - The public API: (no)
  - The schema: ( no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (yes)
  [[PIP][SQL]Migrate SchemaHandle to Presto-decoder](https://docs.google.com/document/d/1KwG0GoHccju4-QNPfvT6tOwhp5Fvs6-iZlfLooPxTDM/edit?usp=sharing)

* codeStyle fix

* Update pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java

Co-authored-by: ran <[email protected]>

* Update pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java

Co-authored-by: ran <[email protected]>

* Update pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java

Co-authored-by: ran <[email protected]>

* add keyValue\Primitive schema test && add schema cyclic definition detect

* merge master

* merge master

Co-authored-by: wangguowei <[email protected]>
Co-authored-by: ran <[email protected]>
@fjod
Copy link

fjod commented Mar 23, 2021

I have a question about FileDescriptorSet field. In https://github.com/apache/pulsar/blob/c01b1eeda3221bdbf863bf0f3f8373e93d90adef/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java file there is an example test class. It is generated from Test.proto and ExternalMessage.proto files. The problem is that, no matter what I try with protoc , I can not get same FileDescriptorSet content. It's always slightly different. I tried csharp, java, cpp output, with or without --descriptor_set_out - I always get different byte array (and base64 string). And I can not create producer/consumer on apache pulsar client with FileDescriptorSet generated by me, I get com.google.protobuf.InvalidProtocolBufferException.invalidEndTag exception which as far as I googled tells that data is corrupted. When I try to create producer with FileDescriptorSet data from test, it works. What am I doing wrong while generating descriptors using protoc ?

@hnail
Copy link
Contributor Author

hnail commented Mar 24, 2021

I have a question about FileDescriptorSet field. In https://github.com/apache/pulsar/blob/c01b1eeda3221bdbf863bf0f3f8373e93d90adef/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java file there is an example test class. It is generated from Test.proto and ExternalMessage.proto files. The problem is that, no matter what I try with protoc , I can not get same FileDescriptorSet content. It's always slightly different. I tried csharp, java, cpp output, with or without --descriptor_set_out - I always get different byte array (and base64 string). And I can not create producer/consumer on apache pulsar client with FileDescriptorSet generated by me, I get com.google.protobuf.InvalidProtocolBufferException.invalidEndTag exception which as far as I googled tells that data is corrupted. When I try to create producer with FileDescriptorSet data from test, it works. What am I doing wrong while generating descriptors using protoc ?

hello , happy for your take notice of this pull request .
I test as your description with the two following test case :

// buid FileDescriptorSet byte[] by Class  
byte[] fileDescriptorSetBytesByClass = DescriptorProtos.FileDescriptorSet.newBuilder()
                .addFile(Service.ServiceRequest.getDescriptor().getFile().toProto())
                .build().toByteArray();  

DescriptorProtos.FileDescriptorSet fileDescriptorSetByClass = DescriptorProtos.FileDescriptorSet
                .parseFrom(fileDescriptorSetBytesByClass);

// print FileDescriptorSet string which buid by Class
System.out.println(new String(fileDescriptorSetByClass.toBuilder().build().toByteArray(),"utf-8"));
  • FileDescriptorSet build by protoc command , same as your description:
// buid FileDescriptorSet byte[] by 'protoc --include_imports --descriptor_set_out Request.desc Request.proto
byte[] fileDescriptorSetBytesByProtoC = FileUtils.readFileToByteArray(new File("Request.desc"));

DescriptorProtos.FileDescriptorSet fileDescriptorSetByProtoC = DescriptorProtos.FileDescriptorSet
                .parseFrom(fileDescriptorSetBytesByProtoC);

// print FileDescriptorSet string which buid by ProtoC
System.out.println(new String(fileDescriptorSetByClass.toBuilder().build().toByteArray(),"utf-8"));

The two realize above works for me :

  • The serialized FileDescriptorSet byte[] is slightly different as your description, I think maybe java proto class lack of information compare with proto file when compile java class .
  • Deserialize FileDescriptorSet is by DescriptorProtos.FileDescriptorSet.parseFrom(bytes[]) in two way above ProtobufNativeSchemaUtils#deserialize . so , even though byte[] is slightly different , but at the same of working .
  • The current realize is reference google-doc : protobuf v3 Self-describing Messages

so , I think the reason is new ObjectMapper().writeValueAsBytes(schemaData); which serialize byte[] to String , may be is 'UTF-8' or 'Base64' ?

@fjod
Copy link

fjod commented Mar 24, 2021

I found the culprit here, I deleted important information from source .proto file like java_package and java_outer_classname, and protoc was generating different info which apache pulsar was not able to use.

@hnail
Copy link
Contributor Author

hnail commented Mar 24, 2021

I found the culprit here, I deleted important information from source .proto file like java_package and java_outer_classname, and protoc was generating different info which apache pulsar was not able to use.

I recommend you can use JAVA API to create Schema like this :

SchemaDefinition def =  SchemaDefinition.<ProtoMsg>builder()
                .withPojo(ProtoMsg.class)
                .build();
Schema schema = Schema.PROTOBUF_NATIVE(def);
admin.schemas().createSchema(topic, schema.getSchemaInfo());

The java_package and java_outer_classname restrictions which you mentioned may need check in other language client when create schema by POJO API , be appreciate if your have time to help pull a issue to describe the problem for future optimization .

@fjod
Copy link

fjod commented Mar 24, 2021

The java_package and java_outer_classname restrictions which you mentioned may need check in other language client when create schema by POJO API , be appreciate if your have time to help pull a issue to describe the problem for future optimization .

I created issue here about csharp/java mismatch in generating Descriptors

protocolbuffers/protobuf#8425

@fjod
Copy link

fjod commented Mar 25, 2021

@hnail I think some bug in deserialize method of ProtobufNativeSchemaUtils. I generated descriptor using probuf-net library, here it is CgpUZXN0LnByb3RvEgVwcm90byImCgpYWFhNZXNzYWdlEgsKA2ZvbxgBIAEoCRILCgNiYXIYAiABKAFCD6oCDG5hdGl2ZUNoZWNrMmIGcHJvdG8z . If you check it on https://protogen.marcgravell.com/decode , it decodes fine. Here is json file with some additional fields:

{"fileDescriptorSet":"ClMKClRlc3QucHJvdG8SC25hdGl2ZUNoZWNrIjAKClhYWE1lc3NhZ2USEAoDZm9vGAEgASgJUgNmb28SEAoDYmFyGAIgASgBUgNiYXJiBnByb3RvMw==","rootMessageTypeName":"proto.XXXMessage","rootFileDescriptorName":"Test.proto"}

When I try to send it to apache-pulsar server, I get null-pointer exception on line 104 at

I created sample java app copy-pasting code from ProtobufNativeSchemaUtils. Here is simpler version of deserializer:

         File file;
         Descriptors.Descriptor descriptor;
         file = new File("D:\\serialized");
         byte[] fileContent = Files.readAllBytes(file.toPath());
         ProtobufNativeSchemaData schemaData = new ObjectMapper().readValue(fileContent, ProtobufNativeSchemaData.class);
         FileDescriptorSet fileDescriptorSet = FileDescriptorSet.parseFrom(schemaData.fileDescriptorSet);   
        Map<String, FileDescriptorProto> fileDescriptorProtoCache = new HashMap<>();
        Map<String, Descriptors.FileDescriptor> fileDescriptorCache = new HashMap<>();
        fileDescriptorSet.getFileList().forEach(fileDescriptorProto -> fileDescriptorProtoCache.put(fileDescriptorProto.getName(), fileDescriptorProto));        
        FileDescriptorProto rootFileDescriptorProto = fileDescriptorProtoCache.get("Test.proto");
        deserializeFileDescriptor(rootFileDescriptorProto, fileDescriptorCache, fileDescriptorProtoCache);
        Descriptors.FileDescriptor fileDescriptor = fileDescriptorCache.get("Test.proto");      
        System.out.print(fileDescriptor.toProto());

output is:

name: "Test.proto"
package: "nativeCheck"
message_type {
  name: "XXXMessage"
  field {
    name: "foo"
    number: 1
    label: LABEL_OPTIONAL
    type: TYPE_STRING
    json_name: "foo"
  }
  field {
    name: "bar"
    number: 2
    label: LABEL_OPTIONAL
    type: TYPE_DOUBLE
    json_name: "bar"
  }
}
syntax: "proto3"

So Descriptor is fine! But if I try to use code from the repo:

        Descriptors.Descriptor descriptor2;
        Descriptors.FileDescriptor fileDescriptor2 = fileDescriptorCache.get(schemaData.rootMessageTypeName);
        String package1 = fileDescriptor2.getPackage();
                //trim package
        String[] paths = StringUtils.removeFirst(schemaData.rootMessageTypeName, package1).replaceFirst("\\.", "").split("\\.");
        //extract root message
        descriptor2 = fileDescriptor2.findMessageTypeByName(paths[0]);
        //extract nested message
        for (int i = 1; i < paths.length; i++) {
            descriptor2 = descriptor2.findNestedTypeByName(paths[i]);
        }
        System.out.print(descriptor2.toProto());

I get nullpointer exception on line with getPackage(); - similar to apache-pulsar exception.
image
I guess our generated descriptor does not contain such "package" and fails therefore? It would be great if we can use descriptor from our client too ;)

@hnail
Copy link
Contributor Author

hnail commented Mar 25, 2021

@fjod Hi , Your rootMessageTypeName is incorrect , change to nativeCheck.XXXMessage , rootMessageTypeName must be full path , nativeCheck is your package path .

Simpler Test code :

String schemaDataBytes = "{\"fileDescriptorSet" +
                "\":\"ClMKClRlc3QucHJvdG8SC25hdGl2ZUNoZWNrIjAKClhYWE1lc3NhZ2USEAoDZm9vGAEgASgJUgNmb28SEAoDYmFyGAIgASgBUgNiYXJiBnByb3RvMw==\",\"rootMessageTypeName\":\"nativeCheck.XXXMessage\",\"rootFileDescriptorName\":\"Test.proto\"}";
        
Descriptors.Descriptor descriptor = ProtobufNativeSchemaUtils.deserialize(schemaDataBytes.getBytes());

System.out.println(descriptor);

It is works fine .

lhotari pushed a commit to apache/pulsar-sql that referenced this pull request Oct 18, 2024
Fixes #4747 
Fixes #7652 

### Motivation

PIP-71: https://github.com/apache/pulsar/wiki/PIP-71:-Pulsar-SQL-migrate-SchemaHandle-to-presto-decoder

**Pip-Doc** : [[PIP-71][SQL]Migrate SchemaHandle to Presto-decoder](https://docs.google.com/document/d/1KwG0GoHccju4-QNPfvT6tOwhp5Fvs6-iZlfLooPxTDM/edit?usp=sharing)

In current version , pulsar-presto deserialize fields  rely on SchemaHandler , but this causes the following restrictions :

- **Metadata**: current nested field is dissociate with presto ParameterizedType , It treated nested field as a separated field , so  presto compiler can’t understand the type hierarchy . nested field should be Row type in presto (e.g.  Hive struct type support) . In the same way,array \ map type also shoud associate with presto ParameterizedTypes.
- **Decoder** : SchemaHandler is hard to work with  `RecordCursor.getObject()` to support ROW,MAP,ARRAY .etc

The **motivations** of this pull request :
-  ` PulsarMetadata` take advantage of `ParameterizedType`  to describe `row/array/map` Type instead of resolve nested columns in pulsar-presto connecter.
- Customize `RowDecoder | RowDecoderFactory | ColumnDecoder` to work with pulsar interface, and with some our own extensions  compare to presto original version , we can support more type for backward compatible (e.g. 
 ` TIMESTAMP\DATE\TIME\Real\ARRAY\MAP\ROW ` support).
- Decouple avro or schema type with `pulsar-presto main module` (RecordSet,ConnectorMetadata .etc ), aim to friendly with other schema type ( [ProtobufNative](apache/pulsar#8372)  、thrift etc..).

### Modifications

Describe in [PIP-71: Pulsar SQL migrate SchemaHandle to presto decoder](https://docs.google.com/document/d/1KwG0GoHccju4-QNPfvT6tOwhp5Fvs6-iZlfLooPxTDM/edit?usp=sharing) 

----

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (**yes** )
  - The public API: (no)
  - The schema: ( no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (yes)
  [[PIP][SQL]Migrate SchemaHandle to Presto-decoder](https://docs.google.com/document/d/1KwG0GoHccju4-QNPfvT6tOwhp5Fvs6-iZlfLooPxTDM/edit?usp=sharing)

* codeStyle fix

* Update pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java

Co-authored-by: ran <[email protected]>

* Update pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java

Co-authored-by: ran <[email protected]>

* Update pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java

Co-authored-by: ran <[email protected]>

* add keyValue\Primitive schema test && add schema cyclic definition detect

* merge master

* merge master

Co-authored-by: wangguowei <[email protected]>
Co-authored-by: ran <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants