Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race while accessing connection in partitionProducer #701

Merged
merged 3 commits into from
Jan 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ type partitionProducer struct {
client *client
topic string
log log.Logger
cnx internal.Connection

conn atomic.Value

options *ProducerOptions
producerName string
Expand Down Expand Up @@ -171,7 +172,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
"producerID": p.producerID,
})

p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
p.log.WithField("cnx", p._getConn().ID()).Info("Created producer")
p.setProducerState(producerReady)

if p.options.SendTimeout > 0 {
Expand Down Expand Up @@ -277,8 +278,8 @@ func (p *partitionProducer) grabCnx() error {
nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
p.sequenceIDGenerator = &nextSequenceID
}
p.cnx = res.Cnx
p.cnx.RegisterListener(p.producerID, p)
p._setConn(res.Cnx)
p._getConn().RegisterListener(p.producerID, p)
p.log.WithFields(log.Fields{
"cnx": res.Cnx.ID(),
"epoch": atomic.LoadUint64(&p.epoch),
Expand All @@ -303,7 +304,7 @@ func (p *partitionProducer) grabCnx() error {
pi.sentAt = time.Now()
pi.Unlock()
p.pendingQueue.Put(pi)
p.cnx.WriteData(pi.batchData)
p._getConn().WriteData(pi.batchData)

if pi == lastViewItem {
break
Expand All @@ -325,7 +326,7 @@ func (p *partitionProducer) GetBuffer() internal.Buffer {

func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
p.log.WithField("cnx", p._getConn().ID()).Warn("Connection was closed")
p.connectClosedCh <- connectionClosed{}
}

Expand Down Expand Up @@ -354,7 +355,7 @@ func (p *partitionProducer) reconnectToBroker() {
err := p.grabCnx()
if err == nil {
// Successfully reconnected
p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker")
return
}
errMsg := err.Error()
Expand Down Expand Up @@ -432,13 +433,13 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}

// if msg is too large
if len(payload) > int(p.cnx.GetMaxMessageSize()) {
if len(payload) > int(p._getConn().GetMaxMessageSize()) {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", len(payload)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p.cnx.GetMaxMessageSize()))
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
Expand Down Expand Up @@ -548,7 +549,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
sequenceID: sequenceID,
sendRequests: callbacks,
})
p.cnx.WriteData(batchData)
p._getConn().WriteData(batchData)
}

func (p *partitionProducer) failTimeoutMessages() {
Expand Down Expand Up @@ -681,7 +682,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
sequenceID: sequenceIDs[i],
sendRequests: callbacks[i],
})
p.cnx.WriteData(batchesData[i])
p._getConn().WriteData(batchesData[i])
}

}
Expand Down Expand Up @@ -790,7 +791,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves
// the state discrepancy.
p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId())
p.cnx.Close()
p._getConn().Close()
return
}

Expand All @@ -800,7 +801,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// the state discrepancy.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
p.cnx.Close()
p._getConn().Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe clean up the cached connection when the cnx is closed? otherwise _getConn() will return a closed connection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like #703 is working on this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@freeznet Looking at the implementation of the current Close method, when triggering the logic of Close, the state of the current connection will first be set to connectionClosed. When GetConnection() is executed, if the state of the connection is detected to be closed, the connection will be removed from the cache.

func (c *connection) Close() {
	c.closeOnce.Do(func() {
		c.Lock()
		cnx := c.cnx
		c.Unlock()
		c.changeState(connectionClosed)
if conn.closed() {
			delete(p.connections, key)
			p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v",
				key, conn.logicalAddr, conn.physicalAddr)
			conn = nil // set to nil so we create a new one
		}

return
} else if pi.sequenceID == response.GetSequenceId() {
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
Expand Down Expand Up @@ -856,7 +857,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
p.log.Info("Closing producer")

id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p.cnx, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
Expand All @@ -872,7 +873,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
}

p.setProducerState(producerClosed)
p.cnx.UnregisterListener(p.producerID)
p._getConn().UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()

close(p.closeCh)
Expand Down Expand Up @@ -946,3 +947,18 @@ func (i *pendingItem) Complete() {
i.completed = true
buffersPool.Put(i.batchData)
}

// _setConn sets the internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer when a new connection is available.
func (p *partitionProducer) _setConn(conn internal.Connection) {
p.conn.Store(conn)
}

// _getConn returns internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer before attempting to use the connection
func (p *partitionProducer) _getConn() internal.Connection {
// Invariant: The conn must be non-nill for the lifetime of the partitionProducer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return p.conn.Load().(internal.Connection)
}