diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3d627584e8..8fdcfdb666 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -66,7 +66,8 @@ type partitionProducer struct { client *client topic string log log.Logger - cnx internal.Connection + + conn atomic.Value options *ProducerOptions producerName string @@ -171,7 +172,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions "producerID": p.producerID, }) - p.log.WithField("cnx", p.cnx.ID()).Info("Created producer") + p.log.WithField("cnx", p._getConn().ID()).Info("Created producer") p.setProducerState(producerReady) if p.options.SendTimeout > 0 { @@ -277,8 +278,8 @@ func (p *partitionProducer) grabCnx() error { nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1) p.sequenceIDGenerator = &nextSequenceID } - p.cnx = res.Cnx - p.cnx.RegisterListener(p.producerID, p) + p._setConn(res.Cnx) + p._getConn().RegisterListener(p.producerID, p) p.log.WithFields(log.Fields{ "cnx": res.Cnx.ID(), "epoch": atomic.LoadUint64(&p.epoch), @@ -303,7 +304,7 @@ func (p *partitionProducer) grabCnx() error { pi.sentAt = time.Now() pi.Unlock() p.pendingQueue.Put(pi) - p.cnx.WriteData(pi.batchData) + p._getConn().WriteData(pi.batchData) if pi == lastViewItem { break @@ -325,7 +326,7 @@ func (p *partitionProducer) GetBuffer() internal.Buffer { func (p *partitionProducer) ConnectionClosed() { // Trigger reconnection in the produce goroutine - p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed") + p.log.WithField("cnx", p._getConn().ID()).Warn("Connection was closed") p.connectClosedCh <- connectionClosed{} } @@ -354,7 +355,7 @@ func (p *partitionProducer) reconnectToBroker() { err := p.grabCnx() if err == nil { // Successfully reconnected - p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker") + p.log.WithField("cnx", p._getConn().ID()).Info("Reconnected producer to broker") return } errMsg := err.Error() @@ -432,13 +433,13 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } // if msg is too large - if len(payload) > int(p.cnx.GetMaxMessageSize()) { + if len(payload) > int(p._getConn().GetMaxMessageSize()) { p.publishSemaphore.Release() request.callback(nil, request.msg, errMessageTooLarge) p.log.WithError(errMessageTooLarge). WithField("size", len(payload)). WithField("properties", msg.Properties). - Errorf("MaxMessageSize %d", int(p.cnx.GetMaxMessageSize())) + Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())) p.metrics.PublishErrorsMsgTooLarge.Inc() return } @@ -548,7 +549,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() { sequenceID: sequenceID, sendRequests: callbacks, }) - p.cnx.WriteData(batchData) + p._getConn().WriteData(batchData) } func (p *partitionProducer) failTimeoutMessages() { @@ -681,7 +682,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { sequenceID: sequenceIDs[i], sendRequests: callbacks[i], }) - p.cnx.WriteData(batchesData[i]) + p._getConn().WriteData(batchesData[i]) } } @@ -790,7 +791,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves // the state discrepancy. p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId()) - p.cnx.Close() + p._getConn().Close() return } @@ -800,7 +801,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) // the state discrepancy. p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) - p.cnx.Close() + p._getConn().Close() return } else if pi.sequenceID == response.GetSequenceId() { // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback @@ -856,7 +857,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.log.Info("Closing producer") id := p.client.rpcClient.NewRequestID() - _, err := p.client.rpcClient.RequestOnCnx(p.cnx, id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{ + _, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{ ProducerId: &p.producerID, RequestId: &id, }) @@ -872,7 +873,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { } p.setProducerState(producerClosed) - p.cnx.UnregisterListener(p.producerID) + p._getConn().UnregisterListener(p.producerID) p.batchFlushTicker.Stop() close(p.closeCh) @@ -946,3 +947,18 @@ func (i *pendingItem) Complete() { i.completed = true buffersPool.Put(i.batchData) } + +// _setConn sets the internal connection field of this partition producer atomically. +// Note: should only be called by this partition producer when a new connection is available. +func (p *partitionProducer) _setConn(conn internal.Connection) { + p.conn.Store(conn) +} + +// _getConn returns internal connection field of this partition producer atomically. +// Note: should only be called by this partition producer before attempting to use the connection +func (p *partitionProducer) _getConn() internal.Connection { + // Invariant: The conn must be non-nill for the lifetime of the partitionProducer. + // For this reason we leave this cast unchecked and panic() if the + // invariant is broken + return p.conn.Load().(internal.Connection) +}