Skip to content

Commit

Permalink
Fix data race will accessing connection in partitionConsumer
Browse files Browse the repository at this point in the history
The partitionConsumer maintains a few internal go-routines, two of which
access the underlying internal.Connection.  The main runEvenstLoop()
go-routine reads the connection field while a separate go-routine is used
to detect connection loss, initiate re-connection, and set the connection.

The go-routine that initiates re-connection on connection loss was added
in the following PR in order to address a deadlock:
#535

The above PR also includes the following changes:
* connection drains and fails the incomingRequestsCh when the conneciton is
  closed.
* partitionConsumer uses a separate channel to communicate connection loss
  to the re-connection go-routine.

With the above it in place it is possible for the partitionConsumer to
handle the connection loss in the main runEventsLoop(), allowing us to use
a single go-routine to manage the connection and resolve the data race.

Signed-off-by: Daniel Ferstay <[email protected]>
  • Loading branch information
Daniel Ferstay committed Jun 10, 2021
1 parent e93802c commit 17335cd
Showing 1 changed file with 9 additions and 13 deletions.
22 changes: 9 additions & 13 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,20 +781,16 @@ func (pc *partitionConsumer) runEventsLoop() {
}()
pc.log.Debug("get into runEventsLoop")

go func() {
for {
select {
case <-pc.closeCh:
return
case <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker()
}
}
}()

for {
for i := range pc.eventsCh {
select {
case <-pc.closeCh:
return

case <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker()

case i := <-pc.eventsCh:
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
Expand Down

0 comments on commit 17335cd

Please sign in to comment.