From 7f306f435e8fc5bfafee259713930f66c061e34d Mon Sep 17 00:00:00 2001 From: guangning Date: Tue, 24 Dec 2019 21:52:13 +0800 Subject: [PATCH 1/2] Fix cpp client schema version --- pulsar-client-cpp/lib/ClientConnection.cc | 4 ++-- pulsar-client-cpp/lib/Commands.cc | 2 +- pulsar-client-cpp/lib/ProducerImpl.cc | 3 +++ pulsar-client-cpp/tests/SchemaTest.cc | 3 +++ pulsar-client-cpp/wireshark/pulsarDissector.cc | 4 ++-- 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 384789aea96fc..dbd6b619ff4ae 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -527,7 +527,7 @@ void ClientConnection::processIncomingBuffer() { // At this point, we have at least one complete frame available in the buffer uint32_t cmdSize = incomingBuffer_.readUnsignedInt(); - if (!incomingCmd_.ParseFromArray(incomingBuffer_.data(), cmdSize)) { + if (!incomingCmd_.ParsePartialFromArray(incomingBuffer_.data(), cmdSize)) { LOG_ERROR(cnxString_ << "Error parsing protocol buffer command"); close(); return; @@ -544,7 +544,7 @@ void ClientConnection::processIncomingBuffer() { bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd_); uint32_t metadataSize = incomingBuffer_.readUnsignedInt(); - if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) { + if (!msgMetadata.ParsePartialFromArray(incomingBuffer_.data(), metadataSize)) { LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd_.message().consumer_id() // << ", message ledger id " << incomingCmd_.message().message_id().ledgerid() // diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 7d1f53e951450..bda47d10c23b5 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -650,7 +650,7 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32 const int& singleMetaSize = uncompressedPayload.readUnsignedInt(); SingleMessageMetadata metadata; - metadata.ParseFromArray(uncompressedPayload.data(), singleMetaSize); + metadata.ParsePartialFromArray(uncompressedPayload.data(), singleMetaSize); uncompressedPayload.consume(singleMetaSize); const int& payloadSize = metadata.payload_size(); diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index d658051a22d62..f38ed2ae5934f 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -287,6 +287,9 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen msgMetadata.set_compression(CompressionCodecProvider::convertType(conf_.getCompressionType())); msgMetadata.set_uncompressed_size(uncompressedSize); } + if (!this->getSchemaVersion().empty()) { + msgMetadata.set_schema_version(this->getSchemaVersion()); + } } void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, SendCallback callback, diff --git a/pulsar-client-cpp/tests/SchemaTest.cc b/pulsar-client-cpp/tests/SchemaTest.cc index 7ad7b9e8928db..e11069fb06b8a 100644 --- a/pulsar-client-cpp/tests/SchemaTest.cc +++ b/pulsar-client-cpp/tests/SchemaTest.cc @@ -36,6 +36,9 @@ TEST(SchemaTest, testSchema) { ProducerConfiguration producerConf; producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema)); res = client.createProducer("topic-avro", producerConf, producer); + + // Check schema version + ASSERT_FALSE(producer.getSchemaVersion().empty()); producer.close(); ASSERT_EQ(ResultOk, res); diff --git a/pulsar-client-cpp/wireshark/pulsarDissector.cc b/pulsar-client-cpp/wireshark/pulsarDissector.cc index 6a14264be3108..b418e1dd3f4da 100644 --- a/pulsar-client-cpp/wireshark/pulsarDissector.cc +++ b/pulsar-client-cpp/wireshark/pulsarDissector.cc @@ -200,7 +200,7 @@ static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t *tvb, int static MessageMetadata msgMetadata; uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, metadataSize); - if (!msgMetadata.ParseFromArray(ptr, metadataSize)) { + if (!msgMetadata.ParsePartialFromArray(ptr, metadataSize)) { proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, offset, metadataSize, true, "Error parsing protocol buffer message metadata"); return; @@ -315,7 +315,7 @@ static int dissect_pulsar_message(tvbuff_t *tvb, packet_info* pinfo, proto_tree* } uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, cmdSize); - if (!command.ParseFromArray(ptr, cmdSize)) { + if (!command.ParsePartialFromArray(ptr, cmdSize)) { proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true, "Error parsing protocol buffer command"); return maxOffset; From 5508c13a42392496c6708633954074d99ee46f53 Mon Sep 17 00:00:00 2001 From: guangning Date: Wed, 25 Dec 2019 20:31:29 +0800 Subject: [PATCH 2/2] Revert code to ParseFromArray --- pulsar-client-cpp/lib/ClientConnection.cc | 4 ++-- pulsar-client-cpp/lib/Commands.cc | 2 +- pulsar-client-cpp/wireshark/pulsarDissector.cc | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index dbd6b619ff4ae..384789aea96fc 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -527,7 +527,7 @@ void ClientConnection::processIncomingBuffer() { // At this point, we have at least one complete frame available in the buffer uint32_t cmdSize = incomingBuffer_.readUnsignedInt(); - if (!incomingCmd_.ParsePartialFromArray(incomingBuffer_.data(), cmdSize)) { + if (!incomingCmd_.ParseFromArray(incomingBuffer_.data(), cmdSize)) { LOG_ERROR(cnxString_ << "Error parsing protocol buffer command"); close(); return; @@ -544,7 +544,7 @@ void ClientConnection::processIncomingBuffer() { bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd_); uint32_t metadataSize = incomingBuffer_.readUnsignedInt(); - if (!msgMetadata.ParsePartialFromArray(incomingBuffer_.data(), metadataSize)) { + if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) { LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd_.message().consumer_id() // << ", message ledger id " << incomingCmd_.message().message_id().ledgerid() // diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index bda47d10c23b5..7d1f53e951450 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -650,7 +650,7 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32 const int& singleMetaSize = uncompressedPayload.readUnsignedInt(); SingleMessageMetadata metadata; - metadata.ParsePartialFromArray(uncompressedPayload.data(), singleMetaSize); + metadata.ParseFromArray(uncompressedPayload.data(), singleMetaSize); uncompressedPayload.consume(singleMetaSize); const int& payloadSize = metadata.payload_size(); diff --git a/pulsar-client-cpp/wireshark/pulsarDissector.cc b/pulsar-client-cpp/wireshark/pulsarDissector.cc index b418e1dd3f4da..6a14264be3108 100644 --- a/pulsar-client-cpp/wireshark/pulsarDissector.cc +++ b/pulsar-client-cpp/wireshark/pulsarDissector.cc @@ -200,7 +200,7 @@ static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t *tvb, int static MessageMetadata msgMetadata; uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, metadataSize); - if (!msgMetadata.ParsePartialFromArray(ptr, metadataSize)) { + if (!msgMetadata.ParseFromArray(ptr, metadataSize)) { proto_tree_add_boolean_format(frame_tree, hf_pulsar_error, tvb, offset, metadataSize, true, "Error parsing protocol buffer message metadata"); return; @@ -315,7 +315,7 @@ static int dissect_pulsar_message(tvbuff_t *tvb, packet_info* pinfo, proto_tree* } uint8_t* ptr = (uint8_t*) tvb_get_ptr(tvb, offset, cmdSize); - if (!command.ParsePartialFromArray(ptr, cmdSize)) { + if (!command.ParseFromArray(ptr, cmdSize)) { proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true, "Error parsing protocol buffer command"); return maxOffset;