Skip to content

Commit

Permalink
rpc: minor cleanups
Browse files Browse the repository at this point in the history
Remove some doubled-up logging and simplify the caller of runHeartbeat.

Release note: None
  • Loading branch information
tbg committed Oct 12, 2022
1 parent 9f5818c commit 62d76f8
Showing 1 changed file with 22 additions and 27 deletions.
49 changes: 22 additions & 27 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,13 @@ func (m *connMap) TryInsert(k connKey) (_ *Connection, inserted bool) {
return newConn, true
}

func maybeFatal(ctx context.Context, err error) {
if err == nil || !buildutil.CrdbTestBuild {
return
}
log.FatalfDepth(ctx, 1, "%s", err)
}

// grpcDialNodeInternal connects to the remote node and sets up the async heartbeater.
// This intentionally takes no `context.Context`; it uses one derived from rpcCtx.masterCtx.
func (rpcCtx *Context) grpcDialNodeInternal(
Expand Down Expand Up @@ -1905,42 +1912,30 @@ func (rpcCtx *Context) grpcDialNodeInternal(
ctx,
"rpc.Context: heartbeat", func(ctx context.Context) {
rpcCtx.metrics.HeartbeatLoopsStarted.Inc(1)
defer func() {
if err := rpcCtx.m.Remove(k, conn); err != nil {
if buildutil.CrdbTestBuild {
log.Fatalf(ctx, "%s", err)
} else {
log.Errorf(ctx, "%s", err)
}
}
// Context gets canceled on server shutdown, and if that's likely why
// the connection ended don't increment the metric as a result. We don't
// want activity on the metric every time a node gracefully shuts down.
//
// NB: the ordering here is such that the metric is decremented *after*
// the connection is removed from the pool. No strong reason but feels
// nicer that way and makes for fewer test flakes.
if ctx.Err() == nil {
rpcCtx.metrics.HeartbeatLoopsExited.Inc(1)
}
}()

// Run the heartbeat; this will block until the connection breaks for
// whatever reason.
err := rpcCtx.runHeartbeat(ctx, conn, target)
if err != nil && !grpcutil.IsClosedConnection(err) &&
!grpcutil.IsConnectionRejected(err) {
log.Health.Errorf(ctx, "removing connection to %s due to error: %v", target, err)
// whatever reason. We don't actually have to do anything with the error,
// so we ignore it.
_ = rpcCtx.runHeartbeat(ctx, conn, target)
maybeFatal(ctx, rpcCtx.m.Remove(k, conn))

// Context gets canceled on server shutdown, and if that's likely why
// the connection ended don't increment the metric as a result. We don't
// want activity on the metric every time a node gracefully shuts down.
//
// NB: the ordering here is such that the metric is decremented *after*
// the connection is removed from the pool. No strong reason but feels
// nicer that way and makes for fewer test flakes.
if ctx.Err() == nil {
rpcCtx.metrics.HeartbeatLoopsExited.Inc(1)
}

// Remove the connection via defer.
}); err != nil {
// If node is draining (`err` will always equal stop.ErrUnavailable
// here), return special error (see its comments).
_ = err // ignore this error
conn.err.Store(errDialRejected)
close(conn.initialHeartbeatDone)
_ = rpcCtx.m.Remove(k, conn) // ignoring error since it's just on shutdown anyway
maybeFatal(ctx, rpcCtx.m.Remove(k, conn))
}

return conn
Expand Down

0 comments on commit 62d76f8

Please sign in to comment.