Skip to content

Commit

Permalink
Fix data race while accessing connection in partitionProducer (#701)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolongran <[email protected]>

### Motivation

In #700, we use a separate go rutine to handle the logic of reconnect, so here you may encounter the same data race problem as #535

### Modifications

Now, the conn field is read and written atomically; avoiding race conditions.
  • Loading branch information
wolfstudy authored Jan 12, 2022
1 parent ff7a962 commit 2bcf7c7
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type partitionProducer struct {
client *client
topic string
log log.Logger
cnx internal.Connection

conn atomic.Value

options *ProducerOptions
producerName string
Expand Down Expand Up @@ -171,7 +172,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
"producerID": p.producerID,
})

p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.log.WithField("cnx", p._getConn().ID()).Info("Created producer")
p.setProducerState(producerReady)

if p.options.SendTimeout > 0 {
Expand Down Expand Up @@ -277,8 +278,8 @@ func (p *partitionProducer) grabCnx() error {
nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
p.sequenceIDGenerator = &nextSequenceID
}
p.cnx = res.Cnx
p.cnx.RegisterListener(p.producerID, p)
p._setConn(res.Cnx)
p._getConn().RegisterListener(p.producerID, p)
p.log.WithFields(log.Fields{
"cnx": res.Cnx.ID(),
"epoch": atomic.LoadUint64(&p.epoch),
Expand All @@ -303,7 +304,7 @@ func (p *partitionProducer) grabCnx() error {
pi.sentAt = time.Now()
pi.Unlock()
p.pendingQueue.Put(pi)
p.cnx.WriteData(pi.batchData)
p._getConn().WriteData(pi.batchData)

if pi == lastViewItem {
break
Expand All @@ -325,7 +326,7 @@ func (p *partitionProducer) GetBuffer() internal.Buffer {

func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
p.log.WithField("cnx", p._getConn().ID()).Warn("Connection was closed")
p.connectClosedCh <- connectionClosed{}
}

Expand Down Expand Up @@ -354,7 +355,7 @@ func (p *partitionProducer) reconnectToBroker() {
err := p.grabCnx()
if err == nil {
// Successfully reconnected
p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
return
}
errMsg := err.Error()
Expand Down Expand Up @@ -432,13 +433,13 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}

// if msg is too large
if len(payload) > int(p.cnx.GetMaxMessageSize()) {
if len(payload) > int(p._getConn().GetMaxMessageSize()) {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", len(payload)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p.cnx.GetMaxMessageSize()))
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
Expand Down Expand Up @@ -548,7 +549,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
sequenceID: sequenceID,
sendRequests: callbacks,
})
p.cnx.WriteData(batchData)
p._getConn().WriteData(batchData)
}

func (p *partitionProducer) failTimeoutMessages() {
Expand Down Expand Up @@ -681,7 +682,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
sequenceID: sequenceIDs[i],
sendRequests: callbacks[i],
})
p.cnx.WriteData(batchesData[i])
p._getConn().WriteData(batchesData[i])
}

}
Expand Down Expand Up @@ -790,7 +791,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves
// the state discrepancy.
p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId())
p.cnx.Close()
p._getConn().Close()
return
}

Expand All @@ -800,7 +801,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// the state discrepancy.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
p.cnx.Close()
p._getConn().Close()
return
} else if pi.sequenceID == response.GetSequenceId() {
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
Expand Down Expand Up @@ -856,7 +857,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.log.Info("Closing producer")

id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p.cnx, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
Expand All @@ -872,7 +873,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
}

p.setProducerState(producerClosed)
p.cnx.UnregisterListener(p.producerID)
p._getConn().UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()

close(p.closeCh)
Expand Down Expand Up @@ -946,3 +947,18 @@ func (i *pendingItem) Complete() {
i.completed = true
buffersPool.Put(i.batchData)
}

// _setConn sets the internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer when a new connection is available.
func (p *partitionProducer) _setConn(conn internal.Connection) {
p.conn.Store(conn)
}

// _getConn returns internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer before attempting to use the connection
func (p *partitionProducer) _getConn() internal.Connection {
// Invariant: The conn must be non-nill for the lifetime of the partitionProducer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return p.conn.Load().(internal.Connection)
}

0 comments on commit 2bcf7c7

Please sign in to comment.