diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 7ddff5e4ad..5b61e7dcb4 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -82,6 +82,15 @@ const ( noMessageEntry = -1 ) +type permitsReq int32 + +const ( + // reset the availablePermits of pc + permitsReset permitsReq = iota + // increase the availablePermits + permitsInc +) + type partitionConsumerOpts struct { topic string consumerName string @@ -128,7 +137,8 @@ type partitionConsumer struct { messageCh chan ConsumerMessage // the number of message slots available - availablePermits int32 + availablePermits int32 + availablePermitsCh chan permitsReq // the size of the queue channel for buffering messages queueSize int32 @@ -224,6 +234,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon dlq: dlq, metrics: metrics, schemaInfoCache: newSchemaInfoCache(client, options.topic), + availablePermitsCh: make(chan permitsReq, 10), } pc.setConsumerState(consumerInit) pc.log = client.log.SubLogger(log.Fields{ @@ -932,7 +943,7 @@ func (pc *partitionConsumer) dispatcher() { messages = nil // reset available permits - pc.availablePermits = 0 + pc.availablePermitsCh <- permitsReset initialPermits := uint32(pc.queueSize) pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits) @@ -955,19 +966,14 @@ func (pc *partitionConsumer) dispatcher() { messages[0] = nil messages = messages[1:] - // TODO implement a better flow controller - // send more permits if needed - pc.availablePermits++ - flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1)) - if pc.availablePermits >= flowThreshold { - availablePermits := pc.availablePermits - requestedPermits := availablePermits - pc.availablePermits = 0 + pc.availablePermitsCh <- permitsInc - pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits) - if err := pc.internalFlow(uint32(requestedPermits)); err != nil { - pc.log.WithError(err).Error("unable to send permits") - } + case pr := <-pc.availablePermitsCh: + switch pr { + case permitsInc: + pc.increasePermitsAndRequestMoreIfNeed() + case permitsReset: + pc.availablePermits = 0 } case clearQueueCb := <-pc.clearQueueCh: @@ -998,7 +1004,7 @@ func (pc *partitionConsumer) dispatcher() { messages = nil // reset available permits - pc.availablePermits = 0 + pc.availablePermitsCh <- permitsReset initialPermits := uint32(pc.queueSize) pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits) @@ -1438,6 +1444,25 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, if err != nil { pc.log.Error("Connection was closed when request ack cmd") } + pc.availablePermitsCh <- permitsInc +} + +func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() { + // TODO implement a better flow controller + // send more permits if needed + flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1)) + pc.availablePermits++ + ap := pc.availablePermits + if ap >= flowThreshold { + availablePermits := ap + requestedPermits := ap + pc.availablePermitsCh <- permitsReset + + pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits) + if err := pc.internalFlow(uint32(requestedPermits)); err != nil { + pc.log.WithError(err).Error("unable to send permits") + } + } } // _setConn sets the internal connection field of this partition consumer atomically. diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index a180586215..f574378374 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "io/ioutil" "log" @@ -3180,3 +3181,84 @@ func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) { consumer.Ack(msg) } } + +func TestAvailablePermitsLeak(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + client.Close() + + topic := fmt.Sprintf("my-topic-test-ap-leak-%v", time.Now().Nanosecond()) + + // 1. Producer with valid key name + p1, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, + Schema: NewStringSchema(nil), + DisableBatching: true, + }) + assert.Nil(t, err) + assert.NotNil(t, p1) + + subscriptionName := "enc-failure-subcription" + totalMessages := 1000 + + // 2. KeyReader is not set by the consumer + // Receive should fail since KeyReader is not setup + // because default behaviour of consumer is fail receiving message if error in decryption + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subscriptionName, + }) + assert.Nil(t, err) + + messageFormat := "my-message-%v" + for i := 0; i < totalMessages; i++ { + _, err := p1.Send(context.Background(), &ProducerMessage{ + Value: fmt.Sprintf(messageFormat, i), + }) + assert.Nil(t, err) + } + + // 2. Set another producer that send message without crypto. + // The consumer can receive it correct. + p2, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: NewStringSchema(nil), + DisableBatching: true, + }) + assert.Nil(t, err) + assert.NotNil(t, p2) + + _, err = p2.Send(context.Background(), &ProducerMessage{ + Value: fmt.Sprintf(messageFormat, totalMessages), + }) + assert.Nil(t, err) + + // 3. Discard action on decryption failure. Create a availablePermits leak scenario + consumer.Close() + + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subscriptionName, + Decryption: &MessageDecryptionInfo{ + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionDiscard, + }, + Schema: NewStringSchema(nil), + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + // 4. If availablePermits does not leak, consumer can get the last message which is no crypto. + // The ctx3 will not exceed deadline. + ctx3, cancel3 := context.WithTimeout(context.Background(), 15*time.Second) + _, err = consumer.Receive(ctx3) + cancel3() + assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded), + "This means the resource is exhausted. consumer.Receive() will block forever.") +}