Skip to content

Commit

Permalink
add messageId and topic as props of DLQ message (#907)
Browse files Browse the repository at this point in the history
Co-authored-by: Prabhudas Garule <[email protected]>
  • Loading branch information
GPrabhudas and Prabhudas Garule authored Jan 10, 2023
1 parent 44dc85c commit cf031b8
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,12 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {

expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
assert.Equal(t, []byte(expectMsg), msg.Payload())

// check original messageId
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])

// check original topic
assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
}

// No more messages on the DLQ
Expand Down
12 changes: 11 additions & 1 deletion pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,21 @@ func (r *dlqRouter) run() {
producer := r.getProducer(cm.Consumer.(*consumer).options.Schema)
msg := cm.Message.(*message)
msgID := msg.ID()

// properties associated with original message
properties := msg.Properties()

// include orinal message id in string format in properties
properties[PropertyOriginMessageID] = msgID.String()

// include original topic name of the message in properties
properties[SysPropertyRealTopic] = msg.Topic()

producer.SendAsync(context.Background(), &ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
Properties: msg.Properties(),
Properties: properties,
EventTime: msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
}, func(MessageID, *ProducerMessage, error) {
Expand Down
3 changes: 3 additions & 0 deletions pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type MessageID interface {

// PartitionIdx returns the message partitionIdx
PartitionIdx() int32

// String returns message id in string format
String() string
}

// DeserializeMessageID reconstruct a MessageID object from its serialized representation
Expand Down
8 changes: 8 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,14 @@ func (id *myMessageID) PartitionIdx() int32 {
return id.PartitionIdx()
}

func (id *myMessageID) String() string {
mid, err := DeserializeMessageID(id.data)
if err != nil {
return ""
}
return fmt.Sprintf("%d:%d:%d", mid.LedgerID(), mid.EntryID(), mid.PartitionIdx())
}

func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down
1 change: 1 addition & 0 deletions pulsar/retry_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
SysPropertyRetryTopic = "RETRY_TOPIC"
SysPropertyReconsumeTimes = "RECONSUMETIMES"
SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
PropertyOriginMessageID = "ORIGIN_MESSAGE_ID"
)

type RetryMessage struct {
Expand Down

0 comments on commit cf031b8

Please sign in to comment.