-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exposing broker metadata #745
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header | |
pbMsgID := response.GetMessageId() | ||
|
||
reader := internal.NewMessageReader(headersAndPayload) | ||
brokerMetadata, err := reader.ReadBrokerMetadata() | ||
if err != nil { | ||
// todo optimize use more appropriate error codes | ||
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError) | ||
return err | ||
} | ||
msgMeta, err := reader.ReadMessageMetadata() | ||
if err != nil { | ||
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch) | ||
return err | ||
} | ||
|
||
decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta) | ||
// error decrypting the payload | ||
if err != nil { | ||
|
@@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header | |
pc.AckID(msgID) | ||
continue | ||
} | ||
|
||
var messageIndex *uint64 | ||
var brokerPublishTime *time.Time | ||
if brokerMetadata != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we've checked above that the brokerMetadata object is nil by doing the following:
Do we still need to double check here? Will someone concurrently modify the value of the brokerMetadata object? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if brokerMetadata.Index != nil { | ||
aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1 | ||
messageIndex = &aux | ||
} | ||
if brokerMetadata.BrokerTimestamp != nil { | ||
aux := timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp) | ||
brokerPublishTime = &aux | ||
} | ||
} | ||
// set the consumer so we know how to ack the message id | ||
msgID.consumer = pc | ||
var msg *message | ||
|
@@ -616,6 +632,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header | |
replicatedFrom: msgMeta.GetReplicatedFrom(), | ||
redeliveryCount: response.GetRedeliveryCount(), | ||
orderingKey: string(smm.OrderingKey), | ||
index: messageIndex, | ||
brokerPublishTime: brokerPublishTime, | ||
} | ||
} else { | ||
msg = &message{ | ||
|
@@ -631,6 +649,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header | |
replicationClusters: msgMeta.GetReplicateTo(), | ||
replicatedFrom: msgMeta.GetReplicatedFrom(), | ||
redeliveryCount: response.GetRedeliveryCount(), | ||
index: messageIndex, | ||
brokerPublishTime: brokerPublishTime, | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package internal | ||
|
||
import ( | ||
"encoding/binary" | ||
"errors" | ||
"fmt" | ||
|
||
|
@@ -34,8 +35,9 @@ const ( | |
// MessageFramePadding is for metadata and other frame headers | ||
MessageFramePadding = 10 * 1024 | ||
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. | ||
MaxFrameSize = MaxMessageSize + MessageFramePadding | ||
magicCrc32c uint16 = 0x0e01 | ||
MaxFrameSize = MaxMessageSize + MessageFramePadding | ||
magicCrc32c uint16 = 0x0e01 | ||
magicBrokerEntryMetadata uint16 = 0x0e02 | ||
) | ||
|
||
// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. | ||
|
@@ -119,6 +121,20 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { | |
return &meta, nil | ||
} | ||
|
||
func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) { | ||
magicNumber := binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2)) | ||
if magicNumber != magicBrokerEntryMetadata { | ||
return nil, nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the error object returned as nil here? If we don't need this error field, maybe we can cancel it in function definition? Or we can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not an error, it means broker not enable |
||
r.buffer.Skip(2) | ||
size := r.buffer.ReadUint32() | ||
var brokerEntryMetadata pb.BrokerEntryMetadata | ||
if err := proto.Unmarshal(r.buffer.Read(size), &brokerEntryMetadata); err != nil { | ||
return nil, err | ||
} | ||
return &brokerEntryMetadata, nil | ||
} | ||
|
||
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) { | ||
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 { | ||
return nil, nil, ErrEOM | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of using pointers we can use the following definition:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy It can be null if broker not enable
brokerMetadata
feature