-
Notifications
You must be signed in to change notification settings - Fork 900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GODRIVER-2335 Preemptively cancel in progress operations when SDAM heartbeats timeout. #1423
Conversation
360f89e
to
4d1fb41
Compare
6ade6cb
to
6e5ef26
Compare
95541dc
to
e24905e
Compare
API Change Report./eventcompatible changesPoolEvent.Interruption: added |
20259b9
to
924351d
Compare
} | ||
threadOp := new(operation) | ||
if err := operationRaw.Unmarshal(threadOp); err != nil { | ||
return fmt.Errorf("error unmarshalling 'operation' argument: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return fmt.Errorf("error unmarshalling 'operation' argument: %v", err) | |
return fmt.Errorf("error unmarshaling 'operation' argument: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is what confuses me. I believe we've talked about its spelling in en-US vs en-GB, but I still see a lot "marshalling" in our code. Maybe they are worth an orthography PR.
select { | ||
case <-ch: | ||
return nil | ||
case <-time.After(10 * time.Second): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the significance of 10 seconds here? Can we constantize this value akin to waitForEventTimeout
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the specs:
If the waitForThread operation is not satisfied after 10 seconds, this operation MUST cause a test failure.
@@ -32,6 +32,10 @@ var ( | |||
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843", | |||
// TODO(GODRIVER-2943): Fix and unskip this test case. | |||
"Topology lifecycle": "Test times out. See GODRIVER-2943", | |||
|
|||
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment above these tests with a TODO to resolve or a more concise explanation as to why we are skipping them?
Not running these tests avoids ever executing the runOnThread logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qingyang-hu So is the reason we skip this is because these cases don't apply to the Go Driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct.
@@ -568,6 +568,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b | |||
switch strings.ToLower(key) { | |||
case "appname": | |||
clientOpts.SetAppName(value.(string)) | |||
case "connecttimeoutms": | |||
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Microsecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this conversion be time.Millisecond
? `connect
@@ -48,6 +48,8 @@ type connection struct { | |||
// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG | |||
// - suggested layout: https://go101.org/article/memory-layout.html | |||
state int64 | |||
inUse bool | |||
err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any concern of these values being written / read concurrently? Do we need to have an error here? The only instance of this error being set is in clearAll
and is
poolClearedError{err: fmt.Errorf("interrupted"), address: p.address})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a concurrent issue in the current code. The inUse
is set in checkin and checkout, and the value is only read by interruption between checkin and checkout.
The err
is optional merely to indicate the ConnectionError
from reading/writing more clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clear
or clearAll
method may be called concurrently with checkIn
/checkOut
, leading to a concurrent read/write of inUse
.
Some possible solutions:
- Make
inUse
anatomic.Bool
. - Keep a set of in-use connections, similar to how we keep a set of all connections and idle connections.
- Close all connections if
interruptInUseConnections=true
(i.e. don't try to distinguish between in-use and idle).
(1) is the smallest change from the current implementation. However, I think we should seriously consider (3) because it doesn't seem like the runtime optimization of lazy-closing idle connections (when we're about to force-close in-use connections) is worth the significant increase in code complexity necessary to keep track of what connections are in-use.
x/mongo/driver/topology/pool.go
Outdated
p.clearImpl(err, serviceID, nil) | ||
} | ||
|
||
func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, interruptionCallback func()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a need to do this with a callback? There only seems to be one implementation of the callback, I think it would be more robust to just combine all of this logic into one function, which would also be closer to the logic in the specification:
clear(interruptInUseConnections: Optional<Boolean>): void;
Something like this:
func (p *pool) clear(err error, serviceID *primitive.ObjectID, interruptInUseConnections bool) {
p.clearImpl(err, serviceID, nil)
// (existing logic)
p.removePerishedConns()
if interruptInUseConnections {
interuptConnections(p)
}
// (continue existing logic)
}
Where interuptConnections
is this:
func interuptConnections(p *pool) {
for _, conn := range p.conns {
if !conn.inUse || !p.stale(conn) {
continue
}
_ = conn.closeWithErr(poolClearedError{
err: fmt.Errorf("interrupted"),
address: p.address,
})
_ = p.checkInWithCallback(conn, func() (reason, bool) {
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
}
logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
}
if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.ConnectionCheckedIn,
ConnectionID: conn.driverConnectionID,
Address: conn.addr.String(),
})
}
r, ok := connectionPerished(conn)
if ok {
r = reason{
loggerConn: logger.ReasonConnClosedStale,
event: event.ReasonStale,
}
}
return r, ok
})
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. We can avoid the callback for cleaner code. However, I'd like to keep both the clear()
and clearAll()
to correspond with the optional interruptInUseConnections
flag in the specs considering the lack of an idiom for optional function parameter in Golang. Moreover, interruption is only used a couple times while clear()
has been existing in many places, so it does not seems like a good idea to have the interruptInUseConnections
everywhere with clear()
.
@@ -32,6 +32,11 @@ var ( | |||
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843", | |||
// TODO(GODRIVER-2943): Fix and unskip this test case. | |||
"Topology lifecycle": "Test times out. See GODRIVER-2943", | |||
// The current logic, which was implemented with GODRIVER-2577, only clears pools and cancels in-progress ops if | |||
// the heartbeat fails twice. Therefore, we skip the following spec tests, which requires canceling ops immediately. | |||
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we skip these spec tests, the new "cancel in-progress operations" feature seems untested by any new or existing tests. We should add tests that make sure the feature works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to update the test cases for GODRIVER-2577 in server_test.go to sync up with the changes.
x/mongo/driver/topology/pool.go
Outdated
w.connOpts = append(w.connOpts, func(cfg *connectionConfig) { | ||
cfg.inUse = true | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting inUse
here via a connection
option seems redundant with setting it in the defer func()
above. Why do we need to do it both places?
x/mongo/driver/topology/pool.go
Outdated
if conn.inUse && p.stale(conn) { | ||
_ = conn.closeWithErr(poolClearedError{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: Consider inverting this logic and de-indenting the code below.
E.g.
if !conn.inUse || !p.stale(conn) {
continue
}
_ = conn.closeWithErr(...)
...
@@ -825,12 +861,58 @@ func (p *pool) checkInNoEvent(conn *connection) error { | |||
return nil | |||
} | |||
|
|||
// clearAll does same as the "clear" method and interrupts all in-use connections as well. | |||
func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: Consider the more accurate name clearInUse
.
Note: this recommendation only applies if we're actually trying to clear only in-use connections. If we decide to clear all connections, then this name make sense.
@@ -48,6 +48,8 @@ type connection struct { | |||
// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG | |||
// - suggested layout: https://go101.org/article/memory-layout.html | |||
state int64 | |||
inUse bool | |||
err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clear
or clearAll
method may be called concurrently with checkIn
/checkOut
, leading to a concurrent read/write of inUse
.
Some possible solutions:
- Make
inUse
anatomic.Bool
. - Keep a set of in-use connections, similar to how we keep a set of all connections and idle connections.
- Close all connections if
interruptInUseConnections=true
(i.e. don't try to distinguish between in-use and idle).
(1) is the smallest change from the current implementation. However, I think we should seriously consider (3) because it doesn't seem like the runtime optimization of lazy-closing idle connections (when we're about to force-close in-use connections) is worth the significant increase in code complexity necessary to keep track of what connections are in-use.
6cd38fe
to
9dfddd9
Compare
c.closeConnectContext() | ||
c.wait() // Make sure that the connection has finished connecting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's somewhat surprising that closeWithErr
waits for the connection to finish establishing while close
does not. Should we move these to close
?
go func(op *operation) { | ||
err := op.execute(ctx, loopDone) | ||
ch <- err | ||
}(threadOp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This style of running operations on a "thread" doesn't guarantee that operations will be run in the specified order. The runOnThread operation in the unified test format spec fails to mention that property, but my understanding is that unified spec test runners must preserve the order of ops run in a "thread".
The legacy "runOnThread" implementation here uses a job queue per "thread". Consider using a similar approach and/or copying the code from the legacy spec test runner.
x/mongo/driver/topology/pool.go
Outdated
if atomic.LoadInt64(&p.generation.state) == generationDisconnected { | ||
return true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: This logic isn't introduced here, but it is confusing. The generation state appears to only be "disconnected" when the pool is "closed", and is only used in pool.stale
. Consider replacing this line with a check for if the pool is closed and removing the state
field from poolGenerationMap
.
E.g.
p.stateMu.RLock()
if p.state == poolClosed {
p.stateMu.RUnlock()
return true
}
p.stateMu.RUnlock()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The p.generation
is a substitute for the pool state because p.generation.disconnect()
is called in *pool.close()
, while locking stateMu
may cause deadlock.
x/mongo/driver/topology/pool.go
Outdated
err: fmt.Errorf("interrupted"), | ||
address: p.address, | ||
}) | ||
_ = p.checkInWithCallback(conn, func() (reason, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking in the connection to close it can be dangerous. If there is ever a case where the logic to check for perished connections has a bug, we may check-in an in-use connection, which could lead to data corruption. We should close the connections directly here and not rely on check-in to close them.
x/mongo/driver/topology/pool.go
Outdated
logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...) | ||
} | ||
|
||
if p.monitor != nil { | ||
p.monitor.Event(&event.PoolEvent{ | ||
Type: event.ConnectionCheckedIn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the spec requires that check-in logs/events are emitted when closing in-use connections. If we stop using check-in to close connections (recommended above), we should also not emit these logs/events.
x/mongo/driver/topology/pool.go
Outdated
r, perished := connectionPerished(conn) | ||
if !perished && conn.pool.getState() == poolClosed { | ||
perished = true | ||
r = reason{ | ||
loggerConn: logger.ReasonConnClosedPoolClosed, | ||
event: event.ReasonPoolClosed, | ||
} | ||
} | ||
return r, perished |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about the amount of duplicate code the checkInWithCallback
function creates. There are currently multiple sources of truth for check-in logging, events, and perished connection behavior, depending on which function is called. We should find a way to remove the duplicated code.
01f3cbc
to
545c404
Compare
545c404
to
31b50f1
Compare
x/mongo/driver/topology/pool.go
Outdated
event := &event.PoolEvent{ | ||
Type: event.PoolCleared, | ||
Address: p.address.String(), | ||
ServiceID: serviceID, | ||
Error: err, | ||
} | ||
if interruptAllConnections { | ||
event.Interruption = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: No need for a conditional block here.
event := &event.PoolEvent{ | |
Type: event.PoolCleared, | |
Address: p.address.String(), | |
ServiceID: serviceID, | |
Error: err, | |
} | |
if interruptAllConnections { | |
event.Interruption = true | |
} | |
event := &event.PoolEvent{ | |
Type: event.PoolCleared, | |
Address: p.address.String(), | |
ServiceID: serviceID, | |
Error: err, | |
Interruption: interruptAllConnections | |
} |
generation, _ := server.pool.generation.getGeneration(&serviceID) | ||
assert.Eventuallyf(t, | ||
func() bool { | ||
generation := server.pool.generation.getGeneration(&serviceID) | ||
generation, _ := server.pool.generation.getGeneration(&serviceID) | ||
numConns := server.pool.generation.getNumConns(&serviceID) | ||
return generation == wantGeneration && numConns == wantNumConns | ||
}, | ||
100*time.Millisecond, | ||
1*time.Millisecond, | ||
"expected generation number %v, got %v; expected connection count %v, got %v", | ||
wantGeneration, | ||
server.pool.generation.getGeneration(&serviceID), | ||
generation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-fetching the generation number will lead to confusing test failure messages because we expect it to be updated concurrently while assert.Eventuallyf
is running. We want the error message to show what the generation is when the assertion fails.
A possibly simpler approach is to log the state of the system when the assertion fails. For example:
assert.Eventuallyf(t,
func() bool {
generation := server.pool.generation.getGeneration(&serviceID)
generation, _ := server.pool.generation.getGeneration(&serviceID)
numConns := server.pool.generation.getNumConns(&serviceID)
match := generation == wantGeneration && numConns == wantNumConns
if !match {
t.Logf("Waiting for generation number %v, got %v", wantGeneration, generation)
t.Logf("Waiting for connection count %v, got %v", wantNumConns, numConns)
}
return match
},
100*time.Millisecond,
10*time.Millisecond,
"expected generation number and connection count never matched")
Note that the example also changes the check interval from 1ms to 10ms to reduce log noise in the case there is a failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! 👍
…artbeats timeout. (mongodb#1423)
GODRIVER-2335
Summary
Preemptively cancel in-progress operations when SDAM heartbeats timeout.
Background & Motivation
ClearAll
, is provided to interrupt any in-use connections as partof the clearing.