From bb2910faddcb2ca1452ab62c91067d3559125750 Mon Sep 17 00:00:00 2001 From: Paulo Pereira Date: Wed, 22 Jul 2020 01:20:35 +0100 Subject: [PATCH 1/4] ISSUE-328 reader gets last message when LatestMessageID and inclusive Signed-off-by: Paulo Pereira --- pulsar/reader_impl.go | 17 +++++++++++++++ pulsar/reader_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index d97cc962c2..09edc35c09 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -40,6 +40,8 @@ var ( Name: "pulsar_client_readers_closed", Help: "Counter of readers closed by the client", }) + + lastestMessageID = LatestMessageID() ) type reader struct { @@ -115,6 +117,21 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } + if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID { + msgID, err := pc.getLastMessageID() + if err != nil { + return nil, err + } + pc.startMessageID = msgID + + err = pc.Seek(msgID) + if err != nil { + return nil, err + } + + reader.lastMessageInBroker = msgID + } + reader.pc = pc readersOpened.Inc() return reader, nil diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index d99bfcb5f7..dcaf791618 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -446,3 +446,54 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) { assert.Equal(t, []byte(expectMsg), msg.Payload()) } } + +func TestReaderLatestInclusiveHasNext(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + var lastMsgID MessageID + for i := 0; i < 10; i++ { + lastMsgID, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, lastMsgID) + } + + // create reader on the last message (inclusive) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + StartMessageIDInclusive: true, + }) + + assert.Nil(t, err) + defer reader.Close() + + var msgID MessageID + if reader.HasNext() { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + assert.Equal(t, []byte("hello-9"), msg.Payload()) + msgID = msg.ID() + } + + assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize()) +} From 0347336e8016b68c91fd476ecfcd05a34988b11e Mon Sep 17 00:00:00 2001 From: Paulo Pereira Date: Wed, 22 Jul 2020 03:31:35 +0100 Subject: [PATCH 2/4] [ISSUE-328] get last message when asking for LatestMessageID and inclusive Signed-off-by: Paulo Pereira --- go.mod | 2 ++ pulsar/consumer_partition.go | 37 ++++++++++++++++++++++++++++-------- pulsar/reader_impl.go | 17 ----------------- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 45b6241496..3e41d064c4 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/compress v1.10.8 github.com/kr/pretty v0.2.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pierrec/lz4 v2.0.5+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 0c723c8f06..a1af1f4816 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -76,6 +76,8 @@ var ( Help: "Time it takes for application to process messages", Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, }) + + lastestMessageID = LatestMessageID() ) type consumerState int @@ -191,6 +193,19 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.log.Info("Created consumer") pc.state = consumerReady + if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID { + msgID, err := pc.requestGetLastMessageID() + if err != nil { + return nil, err + } + pc.startMessageID = msgID + + err = pc.requestSeek(msgID) + if err != nil { + return nil, err + } + } + go pc.dispatcher() go pc.runEventsLoop() @@ -250,7 +265,10 @@ func (pc *partitionConsumer) getLastMessageID() (messageID, error) { func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) { defer close(req.doneCh) + req.msgID, req.err = pc.requestGetLastMessageID() +} +func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) { requestID := pc.client.rpcClient.NewRequestID() cmdGetLastMessageID := &pb.CommandGetLastMessageId{ RequestId: proto.Uint64(requestID), @@ -260,11 +278,10 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID) if err != nil { pc.log.WithError(err).Error("Failed to get last message id") - req.err = err - } else { - id := res.Response.GetLastMessageIdResponse.GetLastMessageId() - req.msgID = convertToMessageID(id) + return messageID{}, err } + id := res.Response.GetLastMessageIdResponse.GetLastMessageId() + return convertToMessageID(id), nil } func (pc *partitionConsumer) AckID(msgID messageID) { @@ -340,17 +357,20 @@ func (pc *partitionConsumer) Seek(msgID messageID) error { func (pc *partitionConsumer) internalSeek(seek *seekRequest) { defer close(seek.doneCh) + seek.err = pc.requestSeek(seek.msgID) +} +func (pc *partitionConsumer) requestSeek(msgID messageID) error { if pc.state == consumerClosing || pc.state == consumerClosed { pc.log.Error("Consumer was already closed") - return + return nil } id := &pb.MessageIdData{} - err := proto.Unmarshal(seek.msgID.Serialize(), id) + err := proto.Unmarshal(msgID.Serialize(), id) if err != nil { pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error()) - seek.err = err + return err } requestID := pc.client.rpcClient.NewRequestID() @@ -363,8 +383,9 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) { _, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek) if err != nil { pc.log.WithError(err).Error("Failed to reset to message id") - seek.err = err + return err } + return nil } func (pc *partitionConsumer) SeekByTime(time time.Time) error { diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 09edc35c09..d97cc962c2 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -40,8 +40,6 @@ var ( Name: "pulsar_client_readers_closed", Help: "Counter of readers closed by the client", }) - - lastestMessageID = LatestMessageID() ) type reader struct { @@ -117,21 +115,6 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } - if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID { - msgID, err := pc.getLastMessageID() - if err != nil { - return nil, err - } - pc.startMessageID = msgID - - err = pc.Seek(msgID) - if err != nil { - return nil, err - } - - reader.lastMessageInBroker = msgID - } - reader.pc = pc readersOpened.Inc() return reader, nil From 50ac51c2cf3bbcdf9ebccf588c51130e411ea483 Mon Sep 17 00:00:00 2001 From: Paulo Pereira Date: Wed, 22 Jul 2020 13:49:18 +0100 Subject: [PATCH 3/4] [ISSUE-328] test if there is no message Signed-off-by: Paulo Pereira --- pulsar/consumer_partition.go | 14 ++++++++++---- pulsar/reader_test.go | 12 ++++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index a1af1f4816..c09ed548af 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -100,6 +100,10 @@ const ( nonDurable ) +const ( + noMessageEntry = -1 +) + type partitionConsumerOpts struct { topic string consumerName string @@ -198,11 +202,13 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if err != nil { return nil, err } - pc.startMessageID = msgID + if msgID.entryID != noMessageEntry { + pc.startMessageID = msgID - err = pc.requestSeek(msgID) - if err != nil { - return nil, err + err = pc.requestSeek(msgID) + if err != nil { + return nil, err + } } } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index dcaf791618..08b949e923 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -458,6 +458,18 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { topic := newTopicName() ctx := context.Background() + // create reader on the last message (inclusive) + reader0, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: LatestMessageID(), + StartMessageIDInclusive: true, + }) + + assert.Nil(t, err) + defer reader0.Close() + + assert.False(t, reader0.HasNext()) + // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, From 5ee5bb2e794d5d5b8d80076951bec053189abbfb Mon Sep 17 00:00:00 2001 From: Paulo Pereira Date: Thu, 23 Jul 2020 17:24:47 +0100 Subject: [PATCH 4/4] [ISSUE-328] rebase and tidy Signed-off-by: Paulo Pereira --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index 3e41d064c4..45b6241496 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,6 @@ require ( github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/compress v1.10.8 github.com/kr/pretty v0.2.0 // indirect - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pierrec/lz4 v2.0.5+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1