Skip to content

Commit

Permalink
[ISSUE-328] get last message when asking for LatestMessageID and incl…
Browse files Browse the repository at this point in the history
…usive

Signed-off-by: Paulo Pereira <[email protected]>
  • Loading branch information
Paulo Pereira committed Jul 22, 2020
1 parent 45bd77b commit d410552
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
Expand Down
37 changes: 29 additions & 8 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var (
Help: "Time it takes for application to process messages",
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
})

lastestMessageID = LatestMessageID()
)

type consumerState int
Expand Down Expand Up @@ -191,6 +193,19 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.log.Info("Created consumer")
pc.state = consumerReady

if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
return nil, err
}
pc.startMessageID = msgID

err = pc.requestSeek(msgID)
if err != nil {
return nil, err
}
}

go pc.dispatcher()

go pc.runEventsLoop()
Expand Down Expand Up @@ -250,7 +265,10 @@ func (pc *partitionConsumer) getLastMessageID() (messageID, error) {

func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
defer close(req.doneCh)
req.msgID, req.err = pc.requestGetLastMessageID()
}

func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId: proto.Uint64(requestID),
Expand All @@ -260,11 +278,10 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id")
req.err = err
} else {
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
req.msgID = convertToMessageID(id)
return messageID{}, err
}
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
return convertToMessageID(id), nil
}

func (pc *partitionConsumer) AckID(msgID messageID) {
Expand Down Expand Up @@ -340,17 +357,20 @@ func (pc *partitionConsumer) Seek(msgID messageID) error {

func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
seek.err = pc.requestSeek(seek.msgID)
}

func (pc *partitionConsumer) requestSeek(msgID messageID) error {
if pc.state == consumerClosing || pc.state == consumerClosed {
pc.log.Error("Consumer was already closed")
return
return nil
}

id := &pb.MessageIdData{}
err := proto.Unmarshal(seek.msgID.Serialize(), id)
err := proto.Unmarshal(msgID.Serialize(), id)
if err != nil {
pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error())
seek.err = err
return err
}

requestID := pc.client.rpcClient.NewRequestID()
Expand All @@ -363,8 +383,9 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
_, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message id")
seek.err = err
return err
}
return nil
}

func (pc *partitionConsumer) SeekByTime(time time.Time) error {
Expand Down
17 changes: 0 additions & 17 deletions pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ var (
Name: "pulsar_client_readers_closed",
Help: "Counter of readers closed by the client",
})

lastestMessageID = LatestMessageID()
)

type reader struct {
Expand Down Expand Up @@ -117,21 +115,6 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
return nil, err
}

if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
msgID, err := pc.getLastMessageID()
if err != nil {
return nil, err
}
pc.startMessageID = msgID

err = pc.Seek(msgID)
if err != nil {
return nil, err
}

reader.lastMessageInBroker = msgID
}

reader.pc = pc
readersOpened.Inc()
return reader, nil
Expand Down

0 comments on commit d410552

Please sign in to comment.