Skip to content

Commit

Permalink
rpc: notify callers to Connect when redial occurs before first heartbeat
Browse files Browse the repository at this point in the history
Before this patch callers would be left blocked on connect forever.

Fixes #41521.

Release note (bug fix): Fix bug whereby rapid network disconnections can
lead to cluster unavailability because goroutines remain blocked waiting
for a connection which will never be initialized to send its first
heartbeat.
  • Loading branch information
ajwerner committed Nov 5, 2019
1 parent a4d88c2 commit 508ffe7
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func NewServerWithInterceptor(

type heartbeatResult struct {
everSucceeded bool // true if the heartbeat has ever succeeded
err error // heartbeat error. should not be nil if everSucceeded is false
err error // heartbeat error, initialized to ErrNotHeartbeated
}

// Connection is a wrapper around grpc.ClientConn. It prevents the underlying
Expand All @@ -298,8 +298,7 @@ type Connection struct {
initialHeartbeatDone chan struct{} // closed after first heartbeat
stopper *stop.Stopper

initOnce sync.Once
validatedOnce sync.Once
initOnce sync.Once
}

func newConnection(stopper *stop.Stopper) *Connection {
Expand Down Expand Up @@ -330,17 +329,12 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) {
// If connection is invalid, return latest heartbeat error.
h := c.heartbeatResult.Load().(heartbeatResult)
if !h.everSucceeded {
// If we've never succeeded, h.err will be ErrNotHeartbeated.
return nil, netutil.NewInitialHeartBeatFailedError(h.err)
}
return c.grpcConn, nil
}

func (c *Connection) setInitialHeartbeatDone() {
c.validatedOnce.Do(func() {
close(c.initialHeartbeatDone)
})
}

// Health returns an error indicating the success or failure of the
// connection's latest heartbeat. Returns ErrNotHeartbeated if the
// first heartbeat has not completed.
Expand Down Expand Up @@ -741,6 +735,14 @@ func (ctx *Context) ConnHealth(target string) error {
func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) error {
initialHeartbeatDone := false
setInitialHeartbeatDone := func() {
if !initialHeartbeatDone {
close(conn.initialHeartbeatDone)
initialHeartbeatDone = true
}
}
defer setInitialHeartbeatDone()
maxOffset := ctx.LocalClock.MaxOffset()
clusterID := ctx.ClusterID.Get()

Expand Down Expand Up @@ -818,7 +820,7 @@ func (ctx *Context) runHeartbeat(
everSucceeded: everSucceeded,
err: err,
})
conn.setInitialHeartbeatDone()
setInitialHeartbeatDone()

heartbeatTimer.Reset(ctx.heartbeatInterval)
}
Expand Down
59 changes: 59 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,65 @@ func TestVersionCheckBidirectional(t *testing.T) {
}
}

// This test ensures that clients cannot be left waiting on
// `Connection.Connect()` calls in the rare case where a heartbeat loop
// exits before attempting to send its first heartbeat. See #41521.
func TestRunHeartbeatSetsHeartbeatStateWhenExitingBeforeFirstHeartbeat(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond)

// This test reaches into low-level implementation details to recreate
// the hazardous scenario seen in #41521. In that isse we saw a runHeartbeat()
// loop exit prior to sending the first heartbeat. To recreate that scenario
// which seems difficult to create now that gRPC backs off redialing, we
// launch the runHeartbeat() loop with an already closed redial chan.
// In order to hit predictable errors we run an actual server on the other
// side of the Connection passed to runHeartbeat().
//
// At least half of the time this test will hit the case where the select
// in runHeartbeat detects the closed redial chan and returns. The
// correctness criteria we're trying to verify is that the Connect call
// below does not block.

rpcCtx := newTestContext(clock, stopper)
serverCtx := newTestContext(clock, stopper)

s := NewServer(serverCtx)
ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
remoteAddr := ln.Addr().String()

c := newConnection(stopper)

redialChan := make(chan struct{})
close(redialChan)

c.grpcConn, _, c.dialErr = rpcCtx.GRPCDialRaw(remoteAddr)
if err != nil {
t.Fatal(c.dialErr)
}
// It is possible that the redial chan being closed is not seen on the first
// pass through the loop.
err = rpcCtx.runHeartbeat(c, "", redialChan)
if !testutils.IsError(err, grpcutil.ErrCannotReuseClientConn.Error()) {
t.Fatalf("expected %v, got %v", grpcutil.ErrCannotReuseClientConn, err)
}
// Even when the runHeartbeat returns, we could have heartbeated successfully.
// If we did not, then we expect the `not yet heartbeated` error.
if _, err = c.Connect(ctx); err != nil && !testutils.IsError(err, "not yet heartbeated") {
t.Fatalf("expected either no error or \"not yet heartbeated\", got %v", err)
}
if err := c.grpcConn.Close(); err != nil {
t.Fatal(err)
}
}

func BenchmarkGRPCDial(b *testing.B) {
if testing.Short() {
b.Skip("TODO: fix benchmark")
Expand Down

0 comments on commit 508ffe7

Please sign in to comment.