Skip to content

Commit

Permalink
Fix stuck when reconnect broker (apache#703)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolongran <[email protected]>

Fixes apache#697

### Motivation

As apache#697 said, In Go SDK, when the reconnection logic is triggered under certain conditions, the reconnection will not succeed due to request timeout.

Comparing the implementation of the Java SDK, we can see that each time the reconnection logic is triggered, the original connection will be closed and a new connection will be created.

![image](https://user-images.githubusercontent.com/20965307/148906906-1cfc5c07-1836-4185-94ec-e43f5565a4a8.png)


So in this pr, we introduced a new `reconnectFlag` field in the `connection` struct to mark the reconnection state. When the broker actively informs the client to close the connection to trigger the reconnection logic, we will store it from the `connections` cache of the `connectionPool`. The old connection object is deleted, and a new connection is created to complete the reconnection

### Modifications

- Add `reconnectFlag` in `connection` struct
  • Loading branch information
wolfstudy authored Jan 12, 2022
1 parent 2bcf7c7 commit 1a8432c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
4 changes: 4 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,8 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)

c.changeState(connectionClosed)

if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
c.DeleteConsumeHandler(consumerID)
Expand All @@ -821,6 +823,8 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer)
c.log.Infof("Broker notification of Closed producer: %d", closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()

c.changeState(connectionClosed)

producer, ok := c.deletePendingProducers(producerID)
// did we find a producer?
if ok {
Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
p.Lock()
conn, ok := p.connections[key]
if ok {
p.log.Debugf("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v",
p.log.Infof("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)

// remove stale/failed connection
if conn.closed() {
delete(p.connections, key)
p.log.Debugf("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v",
p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
conn = nil // set to nil so we create a new one
}
Expand Down

0 comments on commit 1a8432c

Please sign in to comment.