Skip to content

Commit

Permalink
[Fix] Fix the dispatcher() stuck caused by availablePermitsCh (#875)
Browse files Browse the repository at this point in the history
### Motivation

The `availablePermitsCh` may cause the dispatcher stuck.

https://github.com/apache/pulsar-client-go/blob/0b0720ab73d7f6378b8b6ac37acbafe547c268c8/pulsar/consumer_partition.go#L1096-L1109

For example, if `messageCh <- nextMessage` continueously selected 10 times, the `availablePermitsCh` will be filled. The next `messageCh <- nextMessage` will be stuck forever because `pr := <-pc.availablePermitsCh` can never be reached.

### Modifications

Remove the `pc.availablePermitsCh` from dispatcher.
  • Loading branch information
Gleiphir2769 authored Oct 31, 2022
1 parent 0b0720a commit 5679f1f
Showing 1 changed file with 40 additions and 44 deletions.
84 changes: 40 additions & 44 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -83,15 +84,6 @@ const (
noMessageEntry = -1
)

type permitsReq int32

const (
// reset the availablePermits of pc
permitsReset permitsReq = iota
// increase the availablePermits
permitsInc
)

type partitionConsumerOpts struct {
topic string
consumerName string
Expand Down Expand Up @@ -141,8 +133,7 @@ type partitionConsumer struct {
messageCh chan ConsumerMessage

// the number of message slots available
availablePermits int32
availablePermitsCh chan permitsReq
availablePermits *availablePermits

// the size of the queue channel for buffering messages
queueSize int32
Expand Down Expand Up @@ -170,6 +161,37 @@ type partitionConsumer struct {
unAckChunksTracker *unAckChunksTracker
}

type availablePermits struct {
permits int32
pc *partitionConsumer
}

func (p *availablePermits) inc() {
// atomic add availablePermits
ap := atomic.AddInt32(&p.permits, 1)

// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(p.pc.queueSize/2), 1))
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
// check if permits changed
if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) {
return
}

p.pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := p.pc.internalFlow(uint32(requestedPermits)); err != nil {
p.pc.log.WithError(err).Error("unable to send permits")
}
}
}

func (p *availablePermits) reset() {
atomic.StoreInt32(&p.permits, 0)
}

type schemaInfoCache struct {
lock sync.RWMutex
cache map[string]Schema
Expand Down Expand Up @@ -241,8 +263,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
availablePermitsCh: make(chan permitsReq, 10),
}
pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
pc.setConsumerState(consumerInit)
Expand Down Expand Up @@ -931,14 +953,14 @@ func (pc *partitionConsumer) processMessageChunk(compressedPayload internal.Buff
"Received unexpected chunk messageId %s, last-chunk-id %d, chunkId = %d, total-chunks %d",
msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
pc.chunkedMsgCtxMap.remove(uuid)
pc.availablePermitsCh <- permitsInc
pc.availablePermits.inc()
return nil
}

ctx.append(chunkID, msgID, compressedPayload)

if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
pc.availablePermitsCh <- permitsInc
pc.availablePermits.inc()
return nil
}

Expand Down Expand Up @@ -1075,7 +1097,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermitsCh <- permitsReset
pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand All @@ -1098,15 +1120,7 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]

pc.availablePermitsCh <- permitsInc

case pr := <-pc.availablePermitsCh:
switch pr {
case permitsInc:
pc.increasePermitsAndRequestMoreIfNeed()
case permitsReset:
pc.availablePermits = 0
}
pc.availablePermits.inc()

case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
Expand Down Expand Up @@ -1136,7 +1150,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil

// reset available permits
pc.availablePermitsCh <- permitsReset
pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
Expand Down Expand Up @@ -1576,25 +1590,7 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
pc.availablePermitsCh <- permitsInc
}

func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
pc.availablePermits++
ap := pc.availablePermits
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
pc.availablePermitsCh <- permitsReset

pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
pc.log.WithError(err).Error("unable to send permits")
}
}
pc.availablePermits.inc()
}

// _setConn sets the internal connection field of this partition consumer atomically.
Expand Down

0 comments on commit 5679f1f

Please sign in to comment.