-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix data race while accessing connection in partitionProducer #701
Conversation
Signed-off-by: xiaolongran <[email protected]>
cc @dferstay PTAL thanks |
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
@@ -800,7 +801,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) | |||
// the state discrepancy. | |||
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), | |||
response.GetSequenceId(), pi.sequenceID) | |||
p.cnx.Close() | |||
p._getConn().Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe clean up the cached connection when the cnx is closed? otherwise _getConn()
will return a closed connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like #703 is working on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@freeznet Looking at the implementation of the current Close method, when triggering the logic of Close, the state of the current connection will first be set to connectionClosed. When GetConnection() is executed, if the state of the connection is detected to be closed, the connection will be removed from the cache.
func (c *connection) Close() {
c.closeOnce.Do(func() {
c.Lock()
cnx := c.cnx
c.Unlock()
c.changeState(connectionClosed)
if conn.closed() {
delete(p.connections, key)
p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
conn = nil // set to nil so we create a new one
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Signed-off-by: xiaolongran [email protected]
Motivation
In #700, we use a separate go rutine to handle the logic of reconnect, so here you may encounter the same data race problem as #535
Modifications
Now, the conn field is read and written atomically; avoiding race conditions.