Skip to content

Commit

Permalink
[release-21.0] pool: reopen connection closed by idle timeout (#17818) (
Browse files Browse the repository at this point in the history
  • Loading branch information
harshit-gangal authored Feb 20, 2025
1 parent 30b83eb commit f045a01
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 19 deletions.
24 changes: 19 additions & 5 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ type ConnPool[C Connection] struct {
connect Connector[C]
// refresh is the callback to check whether the pool needs to be refreshed
refresh RefreshCheck

// maxCapacity is the maximum value to which capacity can be set; when the pool
// is re-opened, it defaults to this capacity
maxCapacity int64
Expand Down Expand Up @@ -380,6 +379,8 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {

if conn == nil {
var err error
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
conn, err = pool.connNew(context.Background())
if err != nil {
pool.closedConn()
Expand All @@ -392,6 +393,8 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
if lifetime > 0 && conn.timeCreated.elapsed() > lifetime {
pool.Metrics.maxLifetimeClosed.Add(1)
conn.Close()
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
if err := pool.connReopen(context.Background(), conn, conn.timeUsed.get()); err != nil {
pool.closedConn()
return
Expand Down Expand Up @@ -455,15 +458,22 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
return time.Duration(maxLifetime) + time.Duration(rand.Uint32N(uint32(maxLifetime)))
}

func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) error {
var err error
func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Duration) (err error) {
dbconn.Conn, err = pool.config.connect(ctx)
if err != nil {
return err
}

dbconn.timeUsed.set(now)
if setting := dbconn.Conn.Setting(); setting != nil {
err = dbconn.Conn.ApplySetting(ctx, setting)
if err != nil {
dbconn.Close()
return err
}
}

dbconn.timeCreated.set(now)
dbconn.timeUsed.set(now)
return nil
}

Expand Down Expand Up @@ -720,7 +730,11 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
if conn.timeUsed.expired(mono, timeout) {
pool.Metrics.idleClosed.Add(1)
conn.Close()
pool.closedConn()
// Using context.Background() is fine since MySQL connection already enforces
// a connect timeout via the `db_connect_timeout_ms` config param.
if err := pool.connReopen(context.Background(), conn, mono); err != nil {
pool.closedConn()
}
}
}
}
Expand Down
73 changes: 59 additions & 14 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ var (

type TestState struct {
lastID, open, close, reset atomic.Int64
waits []time.Time
mu sync.Mutex

chaos struct {
waits []time.Time
chaos struct {
delayConnect time.Duration
failConnect bool
failConnect atomic.Bool
failApply bool
}
}
Expand Down Expand Up @@ -109,7 +108,7 @@ func newConnector(state *TestState) Connector[*TestConn] {
if state.chaos.delayConnect != 0 {
time.Sleep(state.chaos.delayConnect)
}
if state.chaos.failConnect {
if state.chaos.failConnect.Load() {
return nil, fmt.Errorf("failed to connect: forced failure")
}
return &TestConn{
Expand Down Expand Up @@ -586,6 +585,45 @@ func TestUserClosing(t *testing.T) {
}
}

func TestConnReopen(t *testing.T) {
var state TestState

p := NewPool(&Config[*TestConn]{
Capacity: 1,
IdleTimeout: 200 * time.Millisecond,
MaxLifetime: 10 * time.Millisecond,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

defer p.Close()

conn, err := p.Get(context.Background(), nil)
require.NoError(t, err)
assert.EqualValues(t, 1, state.lastID.Load())
assert.EqualValues(t, 1, p.Active())

// wait enough to reach maxlifetime.
time.Sleep(50 * time.Millisecond)

p.put(conn)
assert.EqualValues(t, 2, state.lastID.Load())
assert.EqualValues(t, 1, p.Active())

// wait enough to reach idle timeout.
time.Sleep(300 * time.Millisecond)
assert.GreaterOrEqual(t, state.lastID.Load(), int64(3))
assert.EqualValues(t, 1, p.Active())
assert.GreaterOrEqual(t, p.Metrics.IdleClosed(), int64(1))

// mark connect to fail
state.chaos.failConnect.Store(true)
// wait enough to reach idle timeout and connect to fail.
time.Sleep(300 * time.Millisecond)
// no active connection should be left.
assert.Zero(t, p.Active())

}

func TestIdleTimeout(t *testing.T) {
testTimeout := func(t *testing.T, setting *Setting) {
var state TestState
Expand All @@ -608,6 +646,7 @@ func TestIdleTimeout(t *testing.T) {

conns = append(conns, r)
}
assert.GreaterOrEqual(t, state.open.Load(), int64(5))

// wait a long while; ensure that none of the conns have been closed
time.Sleep(1 * time.Second)
Expand All @@ -628,9 +667,15 @@ func TestIdleTimeout(t *testing.T) {
t.Fatalf("Connections remain open after 1 second")
}
}
// At least 5 connections should have been closed by now.
assert.GreaterOrEqual(t, p.Metrics.IdleClosed(), int64(5), "At least 5 connections should have been closed by now.")

// At any point, at least 4 connections should be open, with 1 either in the process of opening or already opened.
// The idle connection closer shuts down one connection at a time.
assert.GreaterOrEqual(t, state.open.Load(), int64(4))

// no need to assert anything: all the connections in the pool should are idle-closed
// now and if they're not the test will timeout and fail
// The number of available connections in the pool should remain at 5.
assert.EqualValues(t, 5, p.Available())
}

t.Run("WithoutSettings", func(t *testing.T) { testTimeout(t, nil) })
Expand All @@ -656,7 +701,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
// Change the factory before putting back
// to prevent race with the idle closer, who will
// try to use it.
state.chaos.failConnect = true
state.chaos.failConnect.Store(true)
p.put(r)
timeout := time.After(1 * time.Second)
for p.Active() != 0 {
Expand All @@ -667,7 +712,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
}
}
// reset factory for next run.
state.chaos.failConnect = false
state.chaos.failConnect.Store(false)
}
}

Expand Down Expand Up @@ -758,7 +803,7 @@ func TestExtendedLifetimeTimeout(t *testing.T) {

func TestCreateFail(t *testing.T) {
var state TestState
state.chaos.failConnect = true
state.chaos.failConnect.Store(true)

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Expand Down Expand Up @@ -805,12 +850,12 @@ func TestCreateFailOnPut(t *testing.T) {
require.NoError(t, err)

// change factory to fail the put.
state.chaos.failConnect = true
state.chaos.failConnect.Store(true)
p.put(nil)
assert.Zero(t, p.Active())

// change back for next iteration.
state.chaos.failConnect = false
state.chaos.failConnect.Store(false)
}
}

Expand All @@ -828,7 +873,7 @@ func TestSlowCreateFail(t *testing.T) {
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

state.chaos.failConnect = true
state.chaos.failConnect.Store(true)

for i := 0; i < 3; i++ {
go func() {
Expand All @@ -847,7 +892,7 @@ func TestSlowCreateFail(t *testing.T) {
default:
}

state.chaos.failConnect = false
state.chaos.failConnect.Store(false)
conn, err := p.Get(ctx, setting)
require.NoError(t, err)

Expand Down

0 comments on commit f045a01

Please sign in to comment.