Skip to content

Commit

Permalink
[Issue 833] Fix the availablePermits leak that could cause consumer s…
Browse files Browse the repository at this point in the history
…tuck. (#835)

* fix: fix for issue833

* fix: fix for issue833 by modify dispatcher()
  • Loading branch information
Gleiphir2769 authored Oct 13, 2022
1 parent c8e9195 commit a013ff0
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 15 deletions.
55 changes: 40 additions & 15 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ const (
noMessageEntry = -1
)

type permitsReq int32

const (
// reset the availablePermits of pc
permitsReset permitsReq = iota
// increase the availablePermits
permitsInc
)

type partitionConsumerOpts struct {
topic string
consumerName string
Expand Down Expand Up @@ -128,7 +137,8 @@ type partitionConsumer struct {
messageCh chan ConsumerMessage

// the number of message slots available
availablePermits int32
availablePermits int32
availablePermitsCh chan permitsReq

// the size of the queue channel for buffering messages
queueSize int32
Expand Down Expand Up @@ -224,6 +234,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
availablePermitsCh: make(chan permitsReq, 10),
}
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
Expand Down Expand Up @@ -932,7 +943,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermits = 0
pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand All @@ -955,19 +966,14 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]

// TODO implement a better flow controller
// send more permits if needed
pc.availablePermits++
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
if pc.availablePermits >= flowThreshold {
availablePermits := pc.availablePermits
requestedPermits := availablePermits
pc.availablePermits = 0
pc.availablePermitsCh <- permitsInc

pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
case pr := <-pc.availablePermitsCh:
switch pr {
case permitsInc:
pc.increasePermitsAndRequestMoreIfNeed()
case permitsReset:
pc.availablePermits = 0
}

case clearQueueCb := <-pc.clearQueueCh:
Expand Down Expand Up @@ -998,7 +1004,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermits = 0
pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand Down Expand Up @@ -1438,6 +1444,25 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
pc.availablePermitsCh <- permitsInc
}

func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
pc.availablePermits++
ap := pc.availablePermits
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
pc.availablePermitsCh <- permitsReset

pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
}
}

// _setConn sets the internal connection field of this partition consumer atomically.
Expand Down
82 changes: 82 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"context"
"errors"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -3180,3 +3181,84 @@ func TestConsumerSeekByTimeOnPartitionedTopic(t *testing.T) {
consumer.Ack(msg)
}
}

func TestAvailablePermitsLeak(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
client.Close()

topic := fmt.Sprintf("my-topic-test-ap-leak-%v", time.Now().Nanosecond())

// 1. Producer with valid key name
p1, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Encryption: &ProducerEncryptionInfo{
KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
"crypto/testdata/pri_key_rsa.pem"),
Keys: []string{"client-rsa.pem"},
},
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, p1)

subscriptionName := "enc-failure-subcription"
totalMessages := 1000

// 2. KeyReader is not set by the consumer
// Receive should fail since KeyReader is not setup
// because default behaviour of consumer is fail receiving message if error in decryption
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
})
assert.Nil(t, err)

messageFormat := "my-message-%v"
for i := 0; i < totalMessages; i++ {
_, err := p1.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(messageFormat, i),
})
assert.Nil(t, err)
}

// 2. Set another producer that send message without crypto.
// The consumer can receive it correct.
p2, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Schema: NewStringSchema(nil),
DisableBatching: true,
})
assert.Nil(t, err)
assert.NotNil(t, p2)

_, err = p2.Send(context.Background(), &ProducerMessage{
Value: fmt.Sprintf(messageFormat, totalMessages),
})
assert.Nil(t, err)

// 3. Discard action on decryption failure. Create a availablePermits leak scenario
consumer.Close()

consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
Decryption: &MessageDecryptionInfo{
ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionDiscard,
},
Schema: NewStringSchema(nil),
})
assert.Nil(t, err)
assert.NotNil(t, consumer)

// 4. If availablePermits does not leak, consumer can get the last message which is no crypto.
// The ctx3 will not exceed deadline.
ctx3, cancel3 := context.WithTimeout(context.Background(), 15*time.Second)
_, err = consumer.Receive(ctx3)
cancel3()
assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded),
"This means the resource is exhausted. consumer.Receive() will block forever.")
}

0 comments on commit a013ff0

Please sign in to comment.