From 508ffe75fe6911b22d968648174a25519ba6a856 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 11 Oct 2019 10:23:51 -0400 Subject: [PATCH] rpc: notify callers to Connect when redial occurs before first heartbeat Before this patch callers would be left blocked on connect forever. Fixes #41521. Release note (bug fix): Fix bug whereby rapid network disconnections can lead to cluster unavailability because goroutines remain blocked waiting for a connection which will never be initialized to send its first heartbeat. --- pkg/rpc/context.go | 22 ++++++++------- pkg/rpc/context_test.go | 59 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 10 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 4e06bb8d7efe..114358b2a15c 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -286,7 +286,7 @@ func NewServerWithInterceptor( type heartbeatResult struct { everSucceeded bool // true if the heartbeat has ever succeeded - err error // heartbeat error. should not be nil if everSucceeded is false + err error // heartbeat error, initialized to ErrNotHeartbeated } // Connection is a wrapper around grpc.ClientConn. It prevents the underlying @@ -298,8 +298,7 @@ type Connection struct { initialHeartbeatDone chan struct{} // closed after first heartbeat stopper *stop.Stopper - initOnce sync.Once - validatedOnce sync.Once + initOnce sync.Once } func newConnection(stopper *stop.Stopper) *Connection { @@ -330,17 +329,12 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) { // If connection is invalid, return latest heartbeat error. h := c.heartbeatResult.Load().(heartbeatResult) if !h.everSucceeded { + // If we've never succeeded, h.err will be ErrNotHeartbeated. return nil, netutil.NewInitialHeartBeatFailedError(h.err) } return c.grpcConn, nil } -func (c *Connection) setInitialHeartbeatDone() { - c.validatedOnce.Do(func() { - close(c.initialHeartbeatDone) - }) -} - // Health returns an error indicating the success or failure of the // connection's latest heartbeat. Returns ErrNotHeartbeated if the // first heartbeat has not completed. @@ -741,6 +735,14 @@ func (ctx *Context) ConnHealth(target string) error { func (ctx *Context) runHeartbeat( conn *Connection, target string, redialChan <-chan struct{}, ) error { + initialHeartbeatDone := false + setInitialHeartbeatDone := func() { + if !initialHeartbeatDone { + close(conn.initialHeartbeatDone) + initialHeartbeatDone = true + } + } + defer setInitialHeartbeatDone() maxOffset := ctx.LocalClock.MaxOffset() clusterID := ctx.ClusterID.Get() @@ -818,7 +820,7 @@ func (ctx *Context) runHeartbeat( everSucceeded: everSucceeded, err: err, }) - conn.setInitialHeartbeatDone() + setInitialHeartbeatDone() heartbeatTimer.Reset(ctx.heartbeatInterval) } diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 8f33880b7139..5db265187d22 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1136,6 +1136,65 @@ func TestVersionCheckBidirectional(t *testing.T) { } } +// This test ensures that clients cannot be left waiting on +// `Connection.Connect()` calls in the rare case where a heartbeat loop +// exits before attempting to send its first heartbeat. See #41521. +func TestRunHeartbeatSetsHeartbeatStateWhenExitingBeforeFirstHeartbeat(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond) + + // This test reaches into low-level implementation details to recreate + // the hazardous scenario seen in #41521. In that isse we saw a runHeartbeat() + // loop exit prior to sending the first heartbeat. To recreate that scenario + // which seems difficult to create now that gRPC backs off redialing, we + // launch the runHeartbeat() loop with an already closed redial chan. + // In order to hit predictable errors we run an actual server on the other + // side of the Connection passed to runHeartbeat(). + // + // At least half of the time this test will hit the case where the select + // in runHeartbeat detects the closed redial chan and returns. The + // correctness criteria we're trying to verify is that the Connect call + // below does not block. + + rpcCtx := newTestContext(clock, stopper) + serverCtx := newTestContext(clock, stopper) + + s := NewServer(serverCtx) + ln, err := netutil.ListenAndServeGRPC(stopper, s, util.TestAddr) + if err != nil { + t.Fatal(err) + } + remoteAddr := ln.Addr().String() + + c := newConnection(stopper) + + redialChan := make(chan struct{}) + close(redialChan) + + c.grpcConn, _, c.dialErr = rpcCtx.GRPCDialRaw(remoteAddr) + if err != nil { + t.Fatal(c.dialErr) + } + // It is possible that the redial chan being closed is not seen on the first + // pass through the loop. + err = rpcCtx.runHeartbeat(c, "", redialChan) + if !testutils.IsError(err, grpcutil.ErrCannotReuseClientConn.Error()) { + t.Fatalf("expected %v, got %v", grpcutil.ErrCannotReuseClientConn, err) + } + // Even when the runHeartbeat returns, we could have heartbeated successfully. + // If we did not, then we expect the `not yet heartbeated` error. + if _, err = c.Connect(ctx); err != nil && !testutils.IsError(err, "not yet heartbeated") { + t.Fatalf("expected either no error or \"not yet heartbeated\", got %v", err) + } + if err := c.grpcConn.Close(); err != nil { + t.Fatal(err) + } +} + func BenchmarkGRPCDial(b *testing.B) { if testing.Short() { b.Skip("TODO: fix benchmark")