diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 958dfea97d48e..45b38883bf347 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -242,7 +242,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, dynamic = true, - doc = "The maximum number of tenants that each pulsar cluster can create." + doc = "The maximum number of tenants that each pulsar cluster can create." + "This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded." ) private int maxTenants = 0; @@ -453,9 +453,9 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "When a namespace is created without specifying the number of bundle, this" + " value will be used as the default") private int defaultNumberOfNamespaceBundles = 4; - + @FieldContext( - category = CATEGORY_POLICIES, + category = CATEGORY_POLICIES, dynamic = true, doc = "The maximum number of namespaces that each tenant can create." + "This configuration is not precise control, in a concurrent scenario, the threshold will be exceeded") @@ -1723,7 +1723,8 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private Set schemaRegistryCompatibilityCheckers = Sets.newHashSet( "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck", - "org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck" + "org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck", + "org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck" ); /**** --- WebSocket --- ****/ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java new file mode 100644 index 0000000000000..6a3f7e65905b0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; + +import static com.google.protobuf.Descriptors.Descriptor; + +/** + * The {@link SchemaCompatibilityCheck} implementation for {@link SchemaType#PROTOBUF_NATIVE}. + */ +public class ProtobufNativeSchemaCompatibilityCheck implements SchemaCompatibilityCheck { + + @Override + public SchemaType getSchemaType() { + return SchemaType.PROTOBUF_NATIVE; + } + + @Override + public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { + Descriptor fromDescriptor = ProtobufNativeSchemaUtils.deserialize(from.getData()); + Descriptor toDescriptor = ProtobufNativeSchemaUtils.deserialize(to.getData()); + switch (strategy) { + case BACKWARD_TRANSITIVE: + case BACKWARD: + case FORWARD_TRANSITIVE: + case FORWARD: + case FULL_TRANSITIVE: + case FULL: + checkRootMessageChange(fromDescriptor, toDescriptor, strategy); + return; + case ALWAYS_COMPATIBLE: + return; + default: + throw new IncompatibleSchemaException("Unknown SchemaCompatibilityStrategy."); + } + } + + @Override + public void checkCompatible(Iterable from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { + for (SchemaData schemaData : from) { + checkCompatible(schemaData, to, strategy); + } + } + + private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor toDescriptor, + SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { + if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) { + throw new IncompatibleSchemaException("Protobuf root message isn't allow change!"); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java new file mode 100644 index 0000000000000..acf2878f13a97 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/ProtobufNativeSchemaDataValidator.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema.validator; + +import com.google.protobuf.Descriptors; +import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils; +import org.apache.pulsar.common.protocol.schema.SchemaData; + +public class ProtobufNativeSchemaDataValidator implements SchemaDataValidator { + + @Override + public void validate(SchemaData schemaData) throws InvalidSchemaDataException { + Descriptors.Descriptor descriptor; + try { + descriptor = ProtobufNativeSchemaUtils.deserialize(schemaData.getData()); + } catch (Exception e) { + throw new InvalidSchemaDataException("deserialize ProtobufNative Schema failed", e); + } + if (descriptor == null) { + throw new InvalidSchemaDataException("protobuf root message descriptor is null , please recheck rootMessageTypeName or rootFileDescriptorName conf. "); + } + } + + public static ProtobufNativeSchemaDataValidator of() { + return INSTANCE; + } + + private static final ProtobufNativeSchemaDataValidator INSTANCE = new ProtobufNativeSchemaDataValidator(); + + private ProtobufNativeSchemaDataValidator() { + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java index 5e5f846a7ea11..34a5f3447cf80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java @@ -41,6 +41,9 @@ static void validateSchemaData(SchemaData schemaData) throws InvalidSchemaDataEx case PROTOBUF: StructSchemaDataValidator.of().validate(schemaData); break; + case PROTOBUF_NATIVE: + ProtobufNativeSchemaDataValidator.of().validate(schemaData); + break; case STRING: StringSchemaDataValidator.of().validate(schemaData); break; diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto index 32790b2868411..6ceb8ad3ada81 100644 --- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto +++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto @@ -44,6 +44,7 @@ message SchemaInfo { LOCALDATE = 18; LOCALTIME = 19; LOCALDATETIME = 20; + PROTOBUFNATIVE = 21; } message KeyValuePair { required string key = 1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java new file mode 100644 index 0000000000000..2c5d099f551e0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheckTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static com.google.protobuf.Descriptors.Descriptor; + +public class ProtobufNativeSchemaCompatibilityCheckTest { + + private static final SchemaData schemaData1 = getSchemaData(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.getDescriptor()); + + private static final SchemaData schemaData2 = getSchemaData(org.apache.pulsar.client.api.schema.proto.Test.SubMessage.getDescriptor()); + + /** + * make sure protobuf root message isn't allow change + */ + @Test + public void testRootMessageChange() { + ProtobufNativeSchemaCompatibilityCheck compatibilityCheck = new ProtobufNativeSchemaCompatibilityCheck(); + Assert.assertFalse(compatibilityCheck.isCompatible(schemaData2, schemaData1, + SchemaCompatibilityStrategy.FULL), + "Protobuf root message isn't allow change"); + } + + private static SchemaData getSchemaData(Descriptor descriptor) { + return SchemaData.builder().data(ProtobufNativeSchemaUtils.serialize(descriptor)).type(SchemaType.PROTOBUF_NATIVE).build(); + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index e0d3f0fec969f..b5bea77931b04 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -255,6 +255,27 @@ static Schema PROTOBUF(Sch return DefaultImplementation.newProtobufSchema(schemaDefinition); } + /** + * Create a Protobuf-Native schema type by extracting the fields of the specified class. + * + * @param clazz the Protobuf generated class to be used to extract the schema + * @return a Schema instance + */ + static Schema PROTOBUF_NATIVE(Class clazz) { + return DefaultImplementation.newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build()); + } + + /** + * Create a Protobuf-Native schema type with schema definition. + * + * @param schemaDefinition schemaDefinition the definition of the schema + * @return a Schema instance + */ + static Schema PROTOBUF_NATIVE( + SchemaDefinition schemaDefinition) { + return DefaultImplementation.newProtobufNativeSchema(schemaDefinition); + } + /** * Create a Avro schema type by default configuration of the class. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java index 589ac69cd2b37..9760cad2e50a9 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java @@ -263,6 +263,14 @@ public static Schema newPr .invoke(null, schemaDefinition)); } + public static Schema newProtobufNativeSchema( + SchemaDefinition schemaDefinition) { + return catchExceptions( + () -> (Schema) getStaticMethod( + "org.apache.pulsar.client.impl.schema.ProtobufNativeSchema", "of", SchemaDefinition.class) + .invoke(null, schemaDefinition)); + } + public static Schema newJSONSchema(SchemaDefinition schemaDefinition) { return catchExceptions( () -> (Schema) getStaticMethod( @@ -326,10 +334,22 @@ public static Schema getSchema(SchemaInfo schemaInfo) { } public static GenericSchema getGenericSchema(SchemaInfo schemaInfo) { - return catchExceptions( - () -> (GenericSchema) getStaticMethod( - "org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl", - "of", SchemaInfo.class).invoke(null, schemaInfo)); + switch (schemaInfo.getType()) { + case PROTOBUF_NATIVE: + return (GenericSchema) ReflectionUtils.catchExceptions(() -> { + return (GenericSchema) ReflectionUtils.getStaticMethod( + "org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema", + "of", new Class[]{SchemaInfo.class}).invoke((Object) null, schemaInfo); + }); + default: + return (GenericSchema) ReflectionUtils.catchExceptions(() -> { + return (GenericSchema) ReflectionUtils.getStaticMethod( + "org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl", + "of", new Class[]{SchemaInfo.class}).invoke((Object) null, schemaInfo); + + }); + } + } public static RecordSchemaBuilder newRecordSchemaBuilder(String name) { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java index 7b90972750665..44d74e2ab7d3e 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java @@ -73,6 +73,7 @@ public String getSchemaDefinition() { case AVRO: case JSON: case PROTOBUF: + case PROTOBUF_NATIVE: return new String(schema, UTF_8); case KEY_VALUE: KeyValue schemaInfoKeyValue = diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java index a7328c7b44d6c..d9650a1b9836a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java @@ -139,6 +139,11 @@ public enum SchemaType { */ LOCAL_DATE_TIME(19), + /** + * Protobuf native schema base on Descriptor. + */ + PROTOBUF_NATIVE(20), + // // Schemas that don't have schema info. the value should be negative. // @@ -196,6 +201,7 @@ public static SchemaType valueOf(int value) { case 17: return LOCAL_DATE; case 18: return LOCAL_TIME; case 19: return LOCAL_DATE_TIME; + case 20: return PROTOBUF_NATIVE; case -1: return BYTES; case -2: return AUTO; case -3: return AUTO_CONSUME; @@ -244,6 +250,7 @@ public static boolean isStructType(SchemaType type) { case AVRO: case JSON: case PROTOBUF: + case PROTOBUF_NATIVE: return true; default: return false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index af902f0a679f6..442ede177b37d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.client.impl.schema; -import static com.google.common.base.Preconditions.checkState; - import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; @@ -28,11 +26,14 @@ import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; import java.util.concurrent.ExecutionException; +import static com.google.common.base.Preconditions.checkState; + /** * Auto detect schema. */ @@ -155,6 +156,8 @@ private GenericSchema generateSchema(SchemaInfo schemaInfo) { return GenericJsonSchema.of(schemaInfo,useProvidedSchemaAsReaderSchema); case AVRO: return GenericAvroSchema.of(schemaInfo,useProvidedSchemaAsReaderSchema); + case PROTOBUF_NATIVE: + return GenericProtobufNativeSchema.of(schemaInfo, useProvidedSchemaAsReaderSchema); default: throw new IllegalArgumentException("Currently auto consume works for type '" + schemaInfo.getType() + "' is not supported yet"); @@ -199,6 +202,8 @@ public static Schema getSchema(SchemaInfo schemaInfo) { return GenericJsonSchema.of(schemaInfo); case AVRO: return GenericAvroSchema.of(schemaInfo); + case PROTOBUF_NATIVE: + return GenericProtobufNativeSchema.of(schemaInfo); case KEY_VALUE: KeyValue kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java new file mode 100644 index 0000000000000..343fb93743c02 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Descriptors; +import com.google.protobuf.GeneratedMessageV3; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.api.schema.SchemaReader; +import org.apache.pulsar.client.impl.schema.reader.ProtobufNativeReader; +import org.apache.pulsar.client.impl.schema.writer.ProtobufNativeWriter; +import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +/** + * A schema implementation to deal with protobuf generated messages. + */ +public class ProtobufNativeSchema extends AbstractStructSchema { + + public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__"; + + @Getter + @AllArgsConstructor + public static class ProtoBufParsingInfo { + private final int number; + private final String name; + private final String type; + private final String label; + // For future nested fields + private final Map definition; + } + + private static Descriptors.Descriptor createProtobufNativeSchema(Class pojo) { + try { + Method method = pojo.getMethod("getDescriptor"); + Descriptors.Descriptor descriptor = (Descriptors.Descriptor) method.invoke(null); + return descriptor; + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + private ProtobufNativeSchema(SchemaInfo schemaInfo, T protoMessageInstance) { + super(schemaInfo); + setReader(new ProtobufNativeReader<>(protoMessageInstance)); + setWriter(new ProtobufNativeWriter<>()); + // update properties with protobuf related properties + Map allProperties = new HashMap<>(); + allProperties.putAll(schemaInfo.getProperties()); + // set protobuf parsing info + allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance)); + schemaInfo.setProperties(allProperties); + } + + private String getParsingInfo(T protoMessageInstance) { + List protoBufParsingInfos = new LinkedList<>(); + protoMessageInstance.getDescriptorForType().getFields().forEach(new Consumer() { + @Override + public void accept(Descriptors.FieldDescriptor fieldDescriptor) { + protoBufParsingInfos.add(new ProtoBufParsingInfo(fieldDescriptor.getNumber(), + fieldDescriptor.getName(), fieldDescriptor.getType().name(), + fieldDescriptor.toProto().getLabel().name(), null)); + } + }); + + try { + return new ObjectMapper().writeValueAsString(protoBufParsingInfos); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public Descriptors.Descriptor getProtobufNativeSchema() { + return ProtobufNativeSchemaUtils.deserialize(this.schemaInfo.getSchema()); + } + + @Override + protected SchemaReader loadReader(BytesSchemaVersion schemaVersion) { + throw new RuntimeException("ProtobufNativeSchema don't support schema versioning"); + } + + public static ProtobufNativeSchema of(Class pojo) { + return of(pojo, new HashMap<>()); + } + + public static ProtobufNativeSchema ofGenericClass(Class pojo, Map properties) { + SchemaDefinition schemaDefinition = SchemaDefinition.builder().withPojo(pojo).withProperties(properties).build(); + return ProtobufNativeSchema.of(schemaDefinition); + } + + public static ProtobufNativeSchema of(SchemaDefinition schemaDefinition) { + Class pojo = schemaDefinition.getPojo(); + + if (!GeneratedMessageV3.class.isAssignableFrom(pojo)) { + throw new IllegalArgumentException(GeneratedMessageV3.class.getName() + + " is not assignable from " + pojo.getName()); + } + Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo()); + + SchemaInfo schemaInfo = SchemaInfo.builder() + .schema(ProtobufNativeSchemaUtils.serialize(descriptor)) + .type(SchemaType.PROTOBUF_NATIVE) + .name("") + .properties(schemaDefinition.getProperties()) + .build(); + try { + return new ProtobufNativeSchema(schemaInfo, + (GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null)); + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new IllegalArgumentException(e); + } + } + + public static ProtobufNativeSchema of( + Class pojo, Map properties) { + return ofGenericClass(pojo, properties); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java new file mode 100644 index 0000000000000..7a806b5891588 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import static com.google.protobuf.DescriptorProtos.FileDescriptorProto; +import static com.google.protobuf.DescriptorProtos.FileDescriptorSet; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Descriptors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.common.protocol.schema.ProtobufNativeSchemaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Protobuf-Native schema util used for serialize/deserialize + * between {@link com.google.protobuf.Descriptors.Descriptor} and + * {@link org.apache.pulsar.common.protocol.schema.ProtobufNativeSchemaData}. + */ +public class ProtobufNativeSchemaUtils { + + public static byte[] serialize(Descriptors.Descriptor descriptor) { + byte[] schemaDataBytes; + try { + Map fileDescriptorProtoCache = new HashMap<>(); + //recursively cache all FileDescriptorProto + serializeFileDescriptor(descriptor.getFile(), fileDescriptorProtoCache); + + //extract root message path + String rootMessageTypeName = descriptor.getFullName(); + String rootFileDescriptorName = descriptor.getFile().getFullName(); + //build FileDescriptorSet , this is equal to < protoc --include_imports --descriptor_set_out > + byte[] fileDescriptorSet = FileDescriptorSet.newBuilder().addAllFile(fileDescriptorProtoCache.values()).build().toByteArray(); + + //serialize to bytes + ProtobufNativeSchemaData schemaData = ProtobufNativeSchemaData.builder().fileDescriptorSet(fileDescriptorSet) + .rootFileDescriptorName(rootFileDescriptorName).rootMessageTypeName(rootMessageTypeName).build(); + schemaDataBytes = new ObjectMapper().writeValueAsBytes(schemaData); + logger.debug("descriptor '{}' serialized to '{}'.", descriptor.getFullName(), schemaDataBytes); + } catch (Exception e) { + e.printStackTrace(); + throw new SchemaSerializationException(e); + } + return schemaDataBytes; + } + + private static void serializeFileDescriptor(Descriptors.FileDescriptor fileDescriptor, Map fileDescriptorCache) { + fileDescriptor.getDependencies().forEach(dependency -> { + if (!fileDescriptorCache.containsKey(dependency.getFullName())) { + serializeFileDescriptor(dependency, fileDescriptorCache); + } + } + ); + String[] unResolvedFileDescriptNames = fileDescriptor.getDependencies().stream(). + filter(item -> !fileDescriptorCache.containsKey(item.getFullName())).map(Descriptors.FileDescriptor::getFullName).toArray(String[]::new); + if (unResolvedFileDescriptNames.length == 0) { + fileDescriptorCache.put(fileDescriptor.getFullName(), fileDescriptor.toProto()); + } else { + throw new SchemaSerializationException(fileDescriptor.getFullName() + " can't resolve dependency '" + unResolvedFileDescriptNames + "'."); + } + } + + public static Descriptors.Descriptor deserialize(byte[] schemaDataBytes) { + Descriptors.Descriptor descriptor; + try { + ProtobufNativeSchemaData schemaData = new ObjectMapper().readValue(schemaDataBytes, ProtobufNativeSchemaData.class); + + Map fileDescriptorProtoCache = new HashMap<>(); + Map fileDescriptorCache = new HashMap<>(); + FileDescriptorSet fileDescriptorSet = FileDescriptorSet.parseFrom(schemaData.getFileDescriptorSet()); + fileDescriptorSet.getFileList().forEach(fileDescriptorProto -> fileDescriptorProtoCache.put(fileDescriptorProto.getName(), fileDescriptorProto)); + FileDescriptorProto rootFileDescriptorProto = fileDescriptorProtoCache.get(schemaData.getRootFileDescriptorName()); + + //recursively build FileDescriptor + deserializeFileDescriptor(rootFileDescriptorProto, fileDescriptorCache, fileDescriptorProtoCache); + //extract root fileDescriptor + Descriptors.FileDescriptor fileDescriptor = fileDescriptorCache.get(schemaData.getRootFileDescriptorName()); + //trim package + String[] paths = StringUtils.removeFirst(schemaData.getRootMessageTypeName(), fileDescriptor.getPackage()).replaceFirst(".", "").split("\\."); + //extract root message + descriptor = fileDescriptor.findMessageTypeByName(paths[0]); + //extract nested message + for (int i = 1; i < paths.length; i++) { + descriptor = descriptor.findNestedTypeByName(paths[i]); + } + logger.debug("deserialize '{}' to descriptor: '{}'.", schemaDataBytes, descriptor.getFullName()); + } catch (Exception e) { + e.printStackTrace(); + throw new SchemaSerializationException(e); + } + + return descriptor; + } + + private static void deserializeFileDescriptor(FileDescriptorProto fileDescriptorProto, Map fileDescriptorCache, Map fileDescriptorProtoCache) { + fileDescriptorProto.getDependencyList().forEach(dependencyFileDescriptorName -> { + if (!fileDescriptorCache.containsKey(dependencyFileDescriptorName)) { + FileDescriptorProto dependencyFileDescriptor = fileDescriptorProtoCache.get(dependencyFileDescriptorName); + deserializeFileDescriptor(dependencyFileDescriptor, fileDescriptorCache, fileDescriptorProtoCache); + } + }); + + Descriptors.FileDescriptor[] dependencyFileDescriptors = fileDescriptorProto.getDependencyList().stream().map(dependency -> { + if (fileDescriptorCache.containsKey(dependency)) { + return fileDescriptorCache.get(dependency); + } else { + throw new SchemaSerializationException("'" + fileDescriptorProto.getName() + "' can't resolve dependency '" + dependency + "'."); + } + }).toArray(Descriptors.FileDescriptor[]::new); + + try { + Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencyFileDescriptors); + fileDescriptorCache.put(fileDescriptor.getFullName(), fileDescriptor); + } catch (Descriptors.DescriptorValidationException e) { + e.printStackTrace(); + throw new SchemaSerializationException(e); + } + } + + private static final Logger logger = LoggerFactory.getLogger(ProtobufNativeSchemaUtils.class); + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java new file mode 100644 index 0000000000000..eccc48247cdd4 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReader.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.SchemaReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class GenericProtobufNativeReader implements SchemaReader { + + private Descriptors.Descriptor descriptor; + private byte[] schemaVersion; + private List fields; + + public GenericProtobufNativeReader(Descriptors.Descriptor descriptor) { + this(descriptor, null); + } + + public GenericProtobufNativeReader(Descriptors.Descriptor descriptor, byte[] schemaVersion) { + try { + this.schemaVersion = schemaVersion; + this.descriptor = descriptor; + this.fields = descriptor.getFields() + .stream() + .map(f -> new Field(f.getName(), f.getIndex())) + .collect(Collectors.toList()); + } catch (Exception e) { + log.error("GenericProtobufNativeReader init error", e); + throw new RuntimeException(e); + } + } + + @Override + public GenericProtobufNativeRecord read(byte[] bytes, int offset, int length) { + try { + if (!(bytes.length == length && offset == 0)) { //skip unnecessary bytes copy + bytes = Arrays.copyOfRange(bytes, offset, offset + length); + } + return new GenericProtobufNativeRecord(schemaVersion, descriptor, fields, DynamicMessage.parseFrom(descriptor, bytes)); + } catch (InvalidProtocolBufferException e) { + throw new SchemaSerializationException(e); + } + } + + @Override + public GenericProtobufNativeRecord read(InputStream inputStream) { + try { + return new GenericProtobufNativeRecord(schemaVersion, descriptor, fields, DynamicMessage.parseFrom(descriptor, inputStream)); + } catch (IOException e) { + throw new SchemaSerializationException(e); + } + } + + private static final Logger log = LoggerFactory.getLogger(GenericProtobufNativeReader.class); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java new file mode 100644 index 0000000000000..51b4cb063da79 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeRecord.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import org.apache.pulsar.client.api.schema.Field; + +import java.util.List; + +public class GenericProtobufNativeRecord extends VersionedGenericRecord { + + private DynamicMessage record; + private Descriptors.Descriptor msgDesc; + + protected GenericProtobufNativeRecord(byte[] schemaVersion, Descriptors.Descriptor msgDesc, List fields, DynamicMessage record) { + super(schemaVersion, fields); + this.msgDesc = msgDesc; + this.record = record; + } + + @Override + public Object getField(String fieldName) { + return record.getField(msgDesc.findFieldByName(fieldName)); + } + + public DynamicMessage getProtobufRecord() { + return record; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchema.java new file mode 100644 index 0000000000000..949ad9e2fcf78 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchema.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import com.google.protobuf.Descriptors; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericRecordBuilder; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.SchemaReader; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaUtils; +import org.apache.pulsar.client.impl.schema.SchemaUtils; +import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; +import org.apache.pulsar.common.schema.SchemaInfo; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Generic ProtobufNative schema. + */ +@Slf4j +public class GenericProtobufNativeSchema extends AbstractGenericSchema { + + Descriptors.Descriptor descriptor; + + public GenericProtobufNativeSchema(SchemaInfo schemaInfo) { + this(schemaInfo, true); + } + + public GenericProtobufNativeSchema(SchemaInfo schemaInfo, + boolean useProvidedSchemaAsReaderSchema) { + super(schemaInfo, useProvidedSchemaAsReaderSchema); + this.descriptor = parseProtobufSchema(schemaInfo); + this.fields = descriptor.getFields() + .stream() + .map(f -> new Field(f.getName(), f.getIndex())) + .collect(Collectors.toList()); + setReader(new GenericProtobufNativeReader(descriptor)); + setWriter(new GenericProtobufNativeWriter()); + } + + @Override + public List getFields() { + return fields; + } + + @Override + public GenericRecordBuilder newRecordBuilder() { + return new ProtobufNativeRecordBuilderImpl(this); + } + + @Override + protected SchemaReader loadReader(BytesSchemaVersion schemaVersion) { + SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get()); + if (schemaInfo != null) { + log.info("Load schema reader for version({}), schema is : {}", + SchemaUtils.getStringSchemaVersion(schemaVersion.get()), + schemaInfo); + Descriptors.Descriptor recordDescriptor = parseProtobufSchema(schemaInfo); + Descriptors.Descriptor readerSchemaDescriptor = useProvidedSchemaAsReaderSchema ? descriptor : recordDescriptor; + return new GenericProtobufNativeReader( + readerSchemaDescriptor, + schemaVersion.get()); + } else { + log.warn("No schema found for version({}), use latest schema : {}", + SchemaUtils.getStringSchemaVersion(schemaVersion.get()), + this.schemaInfo); + return reader; + } + } + + protected static Descriptors.Descriptor parseProtobufSchema(SchemaInfo schemaInfo) { + return ProtobufNativeSchemaUtils.deserialize(schemaInfo.getSchema()); + } + + public static GenericSchema of(SchemaInfo schemaInfo) { + return new GenericProtobufNativeSchema(schemaInfo, true); + } + + public static GenericSchema of(SchemaInfo schemaInfo, boolean useProvidedSchemaAsReaderSchema) { + return new GenericProtobufNativeSchema(schemaInfo, useProvidedSchemaAsReaderSchema); + } + + public Descriptors.Descriptor getProtobufNativeSchema() { + return descriptor; + } + + @Override + public boolean supportSchemaVersioning() { + return true; + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeWriter.java new file mode 100644 index 0000000000000..e81b692eaf430 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeWriter.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.SchemaWriter; + +public class GenericProtobufNativeWriter implements SchemaWriter { + @Override + public byte[] write(GenericRecord message) { + return ((GenericProtobufNativeRecord) message).getProtobufRecord().toByteArray(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/ProtobufNativeRecordBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/ProtobufNativeRecordBuilderImpl.java new file mode 100644 index 0000000000000..e9f333ffeb662 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/ProtobufNativeRecordBuilderImpl.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericRecordBuilder; + +public class ProtobufNativeRecordBuilderImpl implements GenericRecordBuilder { + + private GenericProtobufNativeSchema genericSchema; + private DynamicMessage.Builder builder; + private Descriptors.Descriptor msgDesc; + + public ProtobufNativeRecordBuilderImpl(GenericProtobufNativeSchema genericSchema) { + this.genericSchema = genericSchema; + this.msgDesc = genericSchema.getProtobufNativeSchema(); + builder = DynamicMessage.newBuilder(msgDesc); + } + + @Override + public GenericRecordBuilder set(String fieldName, Object value) { + builder.setField(msgDesc.findFieldByName(fieldName), value); + return this; + } + + @Override + public GenericRecordBuilder set(Field field, Object value) { + builder.setField(msgDesc.findFieldByName(field.getName()), value); + return this; + } + + @Override + public GenericRecordBuilder clear(String fieldName) { + builder.clearField(msgDesc.findFieldByName(fieldName)); + return this; + } + + @Override + public GenericRecordBuilder clear(Field field) { + builder.clearField(msgDesc.findFieldByName(field.getName())); + return this; + } + + @Override + public GenericRecord build() { + return new GenericProtobufNativeRecord( + null, + genericSchema.getProtobufNativeSchema(), + genericSchema.getFields(), + builder.build() + ); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java new file mode 100644 index 0000000000000..1e75fccd71b04 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.reader; + +public class ProtobufNativeReader extends ProtobufReader { + + public ProtobufNativeReader(T protoMessageInstance) { + super(protoMessageInstance); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java new file mode 100644 index 0000000000000..44d9128a26cdd --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.writer; + +public class ProtobufNativeWriter extends ProtobufWriter { + + public ProtobufNativeWriter() { + super(); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java new file mode 100644 index 0000000000000..9300ce5da34fd --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; + +@Slf4j +public class ProtobufNativeSchemaTest { + + + private static final String EXPECTED_SCHEMA_JSON = "{\"fileDescriptorSet\":\"CtMDCgpUZXN0LnByb3RvEgVwcm90bxoSRXh0ZXJuYWxUZXN0LnByb3RvImUKClN1Yk1lc3NhZ2U" + + "SCwoDZm9vGAEgASgJEgsKA2JhchgCIAEoARo9Cg1OZXN0ZWRNZXNzYWdlEgsKA3VybBgBIAEoCRINCgV0aXRsZRgCIAEoCRIQCghzbmlwcGV0cxgDIAMoCSLlAQoLV" + + "GVzdE1lc3NhZ2USEwoLc3RyaW5nRmllbGQYASABKAkSEwoLZG91YmxlRmllbGQYAiABKAESEAoIaW50RmllbGQYBiABKAUSIQoIdGVzdEVudW0YBCABKA4yDy5wcm90by5U" + + "ZXN0RW51bRImCgtuZXN0ZWRGaWVsZBgFIAEoCzIRLnByb3RvLlN1Yk1lc3NhZ2USFQoNcmVwZWF0ZWRGaWVsZBgKIAMoCRI4Cg9leHRlcm5hbE1lc3NhZ2UYCyABKAsyHy5wcm90by" + + "5leHRlcm5hbC5FeHRlcm5hbE1lc3NhZ2UqJAoIVGVzdEVudW0SCgoGU0hBUkVEEAASDAoIRkFJTE9WRVIQAUItCiVvcmcuYXBhY2hlLnB1bHNhci5jbGllbnQuc2NoZW1hLnByb3" + + "RvQgRUZXN0YgZwcm90bzMKoAEKEkV4dGVybmFsVGVzdC5wcm90bxIOcHJvdG8uZXh0ZXJuYWwiOwoPRXh0ZXJuYWxNZXNzYWdlEhMKC3N0cmluZ0ZpZWxkGAEgA" + + "SgJEhMKC2RvdWJsZUZpZWxkGAIgASgBQjUKJW9yZy5hcGFjaGUucHVsc2FyLmNsaWVudC5zY2hlbWEucHJvdG9CDEV4dGVybmFsVGVzdGIGcHJvdG8z\"," + + "\"rootMessageTypeName\":\"proto.TestMessage\",\"rootFileDescriptorName\":\"Test.proto\"}"; + + @Test + public void testEncodeAndDecode() { + final String stringFieldValue = "StringFieldValue"; + org.apache.pulsar.client.schema.proto.Test.TestMessage testMessage = org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().setStringField(stringFieldValue).build(); + ProtobufNativeSchema protobufSchema = ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class); + + byte[] bytes = protobufSchema.encode(testMessage); + org.apache.pulsar.client.schema.proto.Test.TestMessage message = protobufSchema.decode(bytes); + + Assert.assertEquals(message.getStringField(), stringFieldValue); + } + + @Test + public void testSchema() { + ProtobufNativeSchema protobufSchema + = ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class); + + Assert.assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE); + + Assert.assertNotNull(ProtobufNativeSchemaUtils.deserialize(protobufSchema.getSchemaInfo().getSchema())); + Assert.assertEquals(new String(protobufSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8), EXPECTED_SCHEMA_JSON); + } + + @Test + public void testGenericOf() { + try { + ProtobufNativeSchema protobufNativeSchema + = ProtobufNativeSchema.ofGenericClass(org.apache.pulsar.client.schema.proto.Test.TestMessage.class, + new HashMap<>()); + } catch (Exception e) { + Assert.fail("Should not construct a ProtobufShema over a non-protobuf-generated class"); + } + + try { + ProtobufSchema protobufSchema + = ProtobufSchema.ofGenericClass(String.class, + Collections.emptyMap()); + Assert.fail("Should not construct a ProtobufNativeShema over a non-protobuf-generated class"); + } catch (Exception e) { + + } + } + + + @Test + public void testDecodeByteBuf() { + ProtobufNativeSchema protobufSchema + = ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class); + org.apache.pulsar.client.schema.proto.Test.TestMessage testMessage = + org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().build(); + byte[] bytes = protobufSchema.encode(org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().build()); + ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes.length); + byteBuf.writeBytes(bytes); + + Assert.assertEquals(testMessage, protobufSchema.decode(byteBuf)); + + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtilsTest.java new file mode 100644 index 0000000000000..29ab73ac5c008 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtilsTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import com.google.protobuf.Descriptors; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ProtobufNativeSchemaUtilsTest { + + @Test + public static void testSerialize() throws Exception { + byte[] data = ProtobufNativeSchemaUtils.serialize(org.apache.pulsar.client.schema.proto.Test.TestMessage.getDescriptor()); + Descriptors.Descriptor descriptor = ProtobufNativeSchemaUtils.deserialize(data); + Assert.assertNotNull(descriptor); + Assert.assertNotNull(descriptor.findFieldByName("nestedField").getMessageType()); + Assert.assertNotNull(descriptor.findFieldByName("externalMessage").getMessageType()); + } + + @Test + public static void testNestedMessage() throws Exception { + byte[] data = ProtobufNativeSchemaUtils.serialize(org.apache.pulsar.client.schema.proto.Test.SubMessage.NestedMessage.getDescriptor()); + Descriptors.Descriptor descriptor = ProtobufNativeSchemaUtils.deserialize(data); + Assert.assertNotNull(descriptor); + + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java index 4c353565202af..cb8138dd86646 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java @@ -40,23 +40,27 @@ public class ProtobufSchemaTest { private static final String EXPECTED_SCHEMA_JSON = "{\"type\":\"record\",\"name\":\"TestMessage\"," + "\"namespace\":\"org.apache.pulsar.client.schema.proto.Test\",\"fields\":[{\"name\":\"stringField\"," + "\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"}," + - "{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0},{\"name\":\"intField\",\"type\":\"int\"," + - "\"default\":0},{\"name\":\"testEnum\",\"type\":{\"type\":\"enum\",\"name\":\"TestEnum\"," + - "\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\"," + - "\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\"," + - "\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\"," + - "\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\"," + - "\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}"; - - private static final String EXPECTED_PARSING_INFO = "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"__PARSING_INFO__\":" + - "\"[{\\\"number\\\":1,\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"" + - "LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\"" + - ":\\\"DOUBLE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6,\\\"name\\\"" + - ":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null}," + + "{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0},{\"name\":\"intField\"," + + "\"type\":\"int\",\"default\":0},{\"name\":\"testEnum\",\"type\":{\"type\":\"enum\",\"name\":\"TestEnum\"," + + "\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\",\"type\":[\"null\"," + + "{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\",\"type\":{\"type\":\"string\"," + + "\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\",\"type\":\"double\",\"default\":0}]}]," + + "\"default\":null},{\"name\":\"repeatedField\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\"," + + "\"avro.java.string\":\"String\"}}},{\"name\":\"externalMessage\",\"type\":[\"null\",{\"type\":\"record\"," + + "\"name\":\"ExternalMessage\",\"namespace\":\"org.apache.pulsar.client.schema.proto.ExternalTest\"," + + "\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}," + + "\"default\":\"\"},{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0}]}],\"default\":null}]}"; + + private static final String EXPECTED_PARSING_INFO = "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"," + + "\"__PARSING_INFO__\":\"[{\\\"number\\\":1,\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\"," + + "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\"," + + "\\\"type\\\":\\\"DOUBLE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6," + + "\\\"name\\\":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null}," + "{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," + - "\\\"definition\\\":null},{\\\"number\\\":5,\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"" + - "label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," + - "\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null}]\"}"; + "\\\"definition\\\":null},{\\\"number\\\":5,\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\"," + + "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," + + "\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null},{\\\"number\\\":11," + + "\\\"name\\\":\\\"externalMessage\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null}]\"}"; @Test public void testEncodeAndDecode() { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java new file mode 100644 index 0000000000000..0865579ffedd4 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.testng.annotations.Test; + +import java.util.concurrent.CompletableFuture; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; + +/** + * Unit testing AbstractGenericSchema for non-avroBasedGenericSchema. + */ +@Slf4j +public class AbstractGenericSchemaTest { + + @Test + public void testGenericProtobufNativeSchema() { + Schema encodeSchema = Schema.PROTOBUF_NATIVE(org.apache.pulsar.client.schema.proto.Test.TestMessage.class); + GenericSchema decodeSchema = GenericProtobufNativeSchema.of(encodeSchema.getSchemaInfo()); + + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + } + + @Test + public void testAutoProtobufNativeSchema() { + // configure the schema info provider + MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class); + GenericSchema genericProtobufNativeSchema = GenericProtobufNativeSchema.of(Schema.PROTOBUF_NATIVE(org.apache.pulsar.client.schema.proto.Test.TestMessage.class).getSchemaInfo()); + when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class))) + .thenReturn(CompletableFuture.completedFuture(genericProtobufNativeSchema.getSchemaInfo())); + + // configure encode schema + Schema encodeSchema = Schema.PROTOBUF_NATIVE(org.apache.pulsar.client.schema.proto.Test.TestMessage.class); + // configure decode schema + AutoConsumeSchema decodeSchema = new AutoConsumeSchema(); + decodeSchema.configureSchemaInfo("test-topic", "topic", encodeSchema.getSchemaInfo()); + decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider); + + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + } + + private void testEncodeAndDecodeGenericRecord(Schema encodeSchema, + Schema decodeSchema) { + int numRecords = 10; + for (int i = 0; i < numRecords; i++) { + org.apache.pulsar.client.schema.proto.Test.TestMessage testMessage = newTestMessage(i); + byte[] data = encodeSchema.encode(testMessage); + + log.info("Decoding : {}", new String(data, UTF_8)); + + GenericRecord record; + if (decodeSchema instanceof AutoConsumeSchema) { + record = decodeSchema.decode(data, new byte[0]); + } else { + record = decodeSchema.decode(data); + } + verifyTestMessageRecord(record, i); + } + } + + + private static org.apache.pulsar.client.schema.proto.Test.TestMessage newTestMessage(int i) { + return org.apache.pulsar.client.schema.proto.Test.TestMessage.newBuilder().setStringField("field-value-" + i) + .setIntField(i).build(); + } + + private static void verifyTestMessageRecord(GenericRecord record, int i) { + Object stringField = record.getField("stringField"); + assertEquals("field-value-" + i, stringField); + Object intField = record.getField("intField"); + assertEquals(+i, intField); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java new file mode 100644 index 0000000000000..985f37dff3097 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; +import org.apache.pulsar.client.schema.proto.Test.TestMessage; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +@Slf4j +public class GenericProtobufNativeReaderTest { + + private TestMessage message; + private GenericRecord genericmessage; + private GenericProtobufNativeSchema genericProtobufNativeSchema; + private ProtobufNativeSchema clazzBasedProtobufNativeSchema; + + + @BeforeMethod + public void setup() { + clazzBasedProtobufNativeSchema = ProtobufNativeSchema.of(SchemaDefinition.builder() + .withPojo(TestMessage.class).build()); + genericProtobufNativeSchema = (GenericProtobufNativeSchema) GenericProtobufNativeSchema.of(clazzBasedProtobufNativeSchema.getSchemaInfo()); + + } + + @Test + public void testGenericReaderByClazzBasedWriterSchema() { + message = TestMessage.newBuilder().setStringField(STRING_FIELD_VLUE).setDoubleField(DOUBLE_FIELD_VLUE).build(); + GenericProtobufNativeReader genericProtobufNativeReader = new GenericProtobufNativeReader(genericProtobufNativeSchema.getProtobufNativeSchema()); + GenericRecord genericRecordByWriterSchema = genericProtobufNativeReader.read(message.toByteArray()); + assertEquals(genericRecordByWriterSchema.getField("stringField"), STRING_FIELD_VLUE); + assertEquals(genericRecordByWriterSchema.getField("doubleField"), DOUBLE_FIELD_VLUE); + } + + @Test + public void testClazzBasedReaderByGenericWriterSchema() { + genericmessage = genericProtobufNativeSchema.newRecordBuilder().set("stringField", STRING_FIELD_VLUE).set("doubleField", DOUBLE_FIELD_VLUE).build(); + byte[] messageBytes = new GenericProtobufNativeWriter().write(genericmessage); + GenericProtobufNativeReader genericProtobufNativeReader = new GenericProtobufNativeReader(clazzBasedProtobufNativeSchema.getProtobufNativeSchema()); + GenericRecord genericRecordByWriterSchema = genericProtobufNativeReader.read(messageBytes); + assertEquals(genericRecordByWriterSchema.getField("stringField"), STRING_FIELD_VLUE); + assertEquals(genericRecordByWriterSchema.getField("doubleField"), DOUBLE_FIELD_VLUE); + + } + + private final static String STRING_FIELD_VLUE = "stringFieldValue"; + private final static double DOUBLE_FIELD_VLUE = 0.2D; + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchemaTest.java new file mode 100644 index 0000000000000..886d7f8105cb0 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchemaTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; +import org.apache.pulsar.client.schema.proto.Test.TestMessage; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class GenericProtobufNativeSchemaTest { + + private TestMessage message; + private GenericRecord genericmessage; + private GenericProtobufNativeSchema genericProtobufNativeSchema; + private ProtobufNativeSchema clazzBasedProtobufNativeSchema; + + @BeforeMethod + public void init() { + clazzBasedProtobufNativeSchema = ProtobufNativeSchema.of(SchemaDefinition.builder() + .withPojo(TestMessage.class).build()); + genericProtobufNativeSchema = (GenericProtobufNativeSchema) GenericProtobufNativeSchema.of(clazzBasedProtobufNativeSchema.getSchemaInfo()); + + } + + @Test + public void testGenericReaderByClazzBasedWriterSchema() { + message = TestMessage.newBuilder().setStringField(STRING_FIELD_VLUE).setDoubleField(DOUBLE_FIELD_VLUE).build(); + byte[] clazzBasedProtobufBytes = clazzBasedProtobufNativeSchema.encode(message); + GenericRecord genericRecord = genericProtobufNativeSchema.decode(clazzBasedProtobufBytes); + assertEquals(genericRecord.getField("stringField"), STRING_FIELD_VLUE); + assertEquals(genericRecord.getField("doubleField"), DOUBLE_FIELD_VLUE); + } + + @Test + public void testClazzBasedReaderByClazzGenericWriterSchema() { + genericmessage = genericProtobufNativeSchema.newRecordBuilder().set("stringField", STRING_FIELD_VLUE).set("doubleField", DOUBLE_FIELD_VLUE).build(); + byte[] messageBytes = genericProtobufNativeSchema.encode(genericmessage); + message = clazzBasedProtobufNativeSchema.decode(messageBytes); + assertEquals(message.getStringField(), STRING_FIELD_VLUE); + assertEquals(message.getDoubleField(), DOUBLE_FIELD_VLUE); + } + + private final static String STRING_FIELD_VLUE = "stringFieldValue"; + private final static double DOUBLE_FIELD_VLUE = 0.2D; + +} diff --git a/pulsar-client/src/test/proto/ExternalTest.proto b/pulsar-client/src/test/proto/ExternalTest.proto new file mode 100644 index 0000000000000..dac5d48038590 --- /dev/null +++ b/pulsar-client/src/test/proto/ExternalTest.proto @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +syntax = "proto3"; +package proto.external; + +option java_package = "org.apache.pulsar.client.schema.proto"; +option java_outer_classname = "ExternalTest"; + +message ExternalMessage { + string stringField = 1; + double doubleField = 2; +} \ No newline at end of file diff --git a/pulsar-client/src/test/proto/Test.proto b/pulsar-client/src/test/proto/Test.proto index 7d7b1b64ab2d7..00515e9404db0 100644 --- a/pulsar-client/src/test/proto/Test.proto +++ b/pulsar-client/src/test/proto/Test.proto @@ -19,6 +19,8 @@ syntax = "proto3"; package proto; +import "ExternalTest.proto"; + option java_package = "org.apache.pulsar.client.schema.proto"; option java_outer_classname = "Test"; @@ -30,6 +32,11 @@ enum TestEnum { message SubMessage { string foo = 1; double bar = 2; + message NestedMessage { + string url = 1; + string title = 2; + repeated string snippets = 3; + } } message TestMessage { @@ -39,4 +46,5 @@ message TestMessage { TestEnum testEnum = 4; SubMessage nestedField = 5; repeated string repeatedField = 10; + proto.external.ExternalMessage externalMessage = 11; } \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index cfbd17fec9bcc..a811a164d6d7b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -451,6 +451,7 @@ public enum Type LocalDate(17, 17), LocalTime(18, 18), LocalDateTime(19, 19), + ProtobufNative(20, 20), ; public static final int None_VALUE = 0; @@ -473,6 +474,7 @@ public enum Type public static final int LocalDate_VALUE = 17; public static final int LocalTime_VALUE = 18; public static final int LocalDateTime_VALUE = 19; + public static final int ProtobufNative_VALUE = 20; public final int getNumber() { return value; } @@ -499,6 +501,7 @@ public static Type valueOf(int value) { case 17: return LocalDate; case 18: return LocalTime; case 19: return LocalDateTime; + case 20: return ProtobufNative; default: return null; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/ProtobufNativeSchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/ProtobufNativeSchemaData.java new file mode 100644 index 0000000000000..069b95c0ed0cf --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/ProtobufNativeSchemaData.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.protocol.schema; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * POJO class used for serialize to json-string for SchemaInfo.schema when SchemaType is SchemaType.PROTOBUF_NATIVE. + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class ProtobufNativeSchemaData { + /** + * protobuf v3 FileDescriptorSet bytes. + **/ + private byte[] fileDescriptorSet; + /** + * protobuf v3 rootMessageTypeName. + **/ + private String rootMessageTypeName; + /** + * protobuf v3 rootFileDescriptorName. + **/ + private String rootFileDescriptorName; + +} diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 78401fc61bcea..b6adde15cb56a 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -44,6 +44,7 @@ message Schema { LocalDate = 17; LocalTime = 18; LocalDateTime = 19; + ProtobufNative = 20; } required string name = 1;