diff --git a/pulsar/reader.go b/pulsar/reader.go index d58d06f6fe..5e1a73b988 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -134,4 +134,7 @@ type Reader interface { // the message publish time where to reposition the subscription // SeekByTime(time time.Time) error + + // GetLastMessageID get the last message id available for consume. + GetLastMessageID() (MessageID, error) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 5a2128a377..ffc92dedde 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -244,3 +244,7 @@ func (r *reader) SeekByTime(time time.Time) error { return r.pc.SeekByTime(time) } + +func (r *reader) GetLastMessageID() (MessageID, error) { + return r.pc.getLastMessageID() +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 0a1b2a1df4..ec10f8f162 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -901,3 +901,45 @@ func TestReaderWithBackoffPolicy(t *testing.T) { partitionConsumerImp.reconnectToBroker() assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) } + +func TestReaderGetLastMessageID(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + topic := newTopicName() + ctx := context.Background() + schema := NewStringSchema(nil) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + var lastMsgID MessageID + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + lastMsgID = msgID + } + + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + getLastMessageID, err := reader.GetLastMessageID() + if err != nil { + return + } + + assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) + assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) +}