From 3e5f7d79a22426f77099693e0c06a1da44d45a3f Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 31 Aug 2020 17:58:51 -0400 Subject: [PATCH 1/2] server: busy loop through resolver list during join process Deferred doing this in #52526. Probably a good idea to do have it, it'll bring down the cluster convergence time (time taken for all nodes to find out about the initialization) by a bit. Release justification: low risk, high benefit changes to existing functionality Release note: None --- pkg/server/init.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/server/init.go b/pkg/server/init.go index 887727097cea..606b83ebc64e 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -390,6 +390,24 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) e return ErrJoinRPCUnsupported } + // Busy-loop through all the resolvers at least once. Keep this code block + // roughly in sync with the one below. + for _, res := range s.config.resolvers { + addr := res.Addr() + err := s.attemptJoin(ctx, addr) + if err == nil { + return nil + } + + if errors.Is(err, ErrJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { + // Propagate upwards; these are error conditions the caller knows to + // expect. + return err + } + + // Ignore all other errors, they'll be better dealt with below. + } + const joinRPCBackoff = time.Second var tickChan <-chan time.Time { From ecc0abf39e1e15b26defd245ff3160a194ab3df7 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 1 Sep 2020 20:38:12 -0400 Subject: [PATCH 2/2] server: always create a liveness record before starting up Previously it used to be the case that it was possible for a node to be up and running, and for there to be no corresponding liveness record for it. This was a very transient situation as liveness records are created for a given node as soon as it out its first heartbeat. Still, given that this could take a few seconds, it lent to a lot of complexity in our handling of node liveness where we had to always anticipate the possibility of there being no corresponding liveness record for a given node (and thus creating it if necessary). It was originally thought we'd only be able to guarantee persisting a liveness record for a joining node if we had infrastructure like the join rpc, introduced in #52526, but staring at it a bit harder we can simply ensure we heartbeat our liveness record at least once before fully spinning up our server (aka opening the RPC floodgates). Having a liveness record for each node always present is a crucial building block for long running migrations (#48843). There the intention is to have the orchestrator process look towards the list of liveness records for an authoritative view of cluster membership. Previously when it was possible for an active member of the cluster to not have a corresponding liveness record (no matter how unlikely or short-lived in practice), we could not generate such a view. Release justification: low risk, high benefit changes to existing functionality Release note: None --- pkg/cmd/roachtest/decommission.go | 15 ---------- pkg/kv/kvserver/node_liveness.go | 14 +++------- pkg/server/init.go | 8 ++---- pkg/server/node.go | 4 --- pkg/server/server.go | 46 +++++++++++++++++++------------ 5 files changed, 36 insertions(+), 51 deletions(-) diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 3617d0a4669c..31ab6c44060d 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -309,21 +309,6 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { Multiplier: 2, } - // This is a pretty gross hack to let the bootstrap info (cluster ID, - // liveness records) disseminate through the cluster. Since it's no longer - // happening through gossip, it takes a bit longer to happen. We should do - // two things to improve our story here: - // - // - We should opportunistically write to the liveness table when adding a - // node through the Join RPC. This would also simplify the handling of - // empty liveness records (they would no longer exist). - // - We should add roachtest helpers that wait until each node has received - // cluster ID information, and use it in all the tests that need it (which - // may very well be all the tests). - // - // TODO(irfansharif): Do the above. - time.Sleep(30 * time.Second) - // Partially decommission then recommission a random node, from another // random node. Run a couple of status checks while doing so. { diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 5e3754dac853..dddf06874ebe 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -455,16 +455,10 @@ func (nl *NodeLiveness) setMembershipStatusInternal( if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { // Liveness record didn't previously exist, so we create one. // - // TODO(irfansharif): This code feels a bit unwieldy because it's - // possible for a liveness record to not exist previously. It is just - // generally difficult to write it at startup. When a node joins the - // cluster, this completes before it has had a chance to write its - // liveness record. If it gets decommissioned immediately, there won't - // be one yet. The Connect RPC can solve this though, I think? We can - // bootstrap clusters with a liveness record for n1. Any other node at - // some point has to join the cluster for the first time via the Connect - // RPC, which as part of its job can make sure the liveness record - // exists before responding to the new node. + // TODO(irfansharif): The above is now no longer possible. We always + // create a liveness record before fully starting up the node. We should + // clean up all this logic that tries to work around the possibility of + // the liveness record not existing. newLiveness = kvserverpb.Liveness{ NodeID: nodeID, Epoch: 1, diff --git a/pkg/server/init.go b/pkg/server/init.go index 606b83ebc64e..04ce177c1d03 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -175,10 +175,8 @@ func (s *initServer) needsInitLocked() bool { // necessarily all. This is fine, since initializing additional stores later is // easy. // -// `initialBoot` is true if this is a new node. This flag should only be used -// for logging and reporting. A newly bootstrapped single-node cluster is -// functionally equivalent to one that restarted; any decisions should be made -// on persisted data instead of this flag. +// `initialStart` is true if this is a new node (i.e. it was either just +// bootstrapped, or it just joined an existing cluster for the first time). // // [1]: In mixed version clusters it waits until Gossip connects (but this is // slated to be removed in 21.1). @@ -191,7 +189,7 @@ func (s *initServer) ServeAndWait( stopper *stop.Stopper, sv *settings.Values, startGossipFn func() *gossip.Gossip, -) (state *initState, initialBoot bool, err error) { +) (state *initState, initialStart bool, err error) { // If already bootstrapped, return early. s.mu.Lock() if !s.needsInitLocked() { diff --git a/pkg/server/node.go b/pkg/server/node.go index 9cd686710ba2..d2b66610beec 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1122,10 +1122,6 @@ func (n *Node) GossipSubscription( // Join implements the roachpb.InternalServer service. This is the // "connectivity" API; individual CRDB servers are passed in a --join list and // the join targets are addressed through this API. -// -// TODO(irfansharif): Perhaps we could opportunistically create a liveness -// record here so as to no longer have to worry about the liveness record not -// existing for a given node. func (n *Node) Join( ctx context.Context, req *roachpb.JoinNodeRequest, ) (*roachpb.JoinNodeResponse, error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 586bc9eaef8d..9ab4e19826a7 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1402,11 +1402,9 @@ func (s *Server) Start(ctx context.Context) error { // one, make sure it's the clusterID we already know (and are guaranteed to // know) at this point. If it's not the same, explode. // - // TODO(tbg): remove this when we have changed ServeAndWait() to join an - // existing cluster via a one-off RPC, at which point we can create gossip - // (and thus the RPC layer) only after the clusterID is already known. We - // can then rely on the RPC layer's protection against cross-cluster - // communication. + // TODO(irfansharif): The above is no longer applicable; in 21.1 we can + // always assume that the RPC layer will always get set up after having + // found out what the cluster ID is. The checks below can be removed then. { // We populated this above, so it should still be set. This is just to // demonstrate that we're not doing anything functional here (and to @@ -1538,6 +1536,32 @@ func (s *Server) Start(ctx context.Context) error { } }) + // Begin the node liveness heartbeat. Add a callback which records the + // local store "last up" timestamp for every store whenever the liveness + // record is updated. We're sure to do this before we open RPC + // floodgates. + var livenessOnce sync.Once + livenessRecordCreated := make(chan struct{}, 1) + s.nodeLiveness.StartHeartbeat(ctx, s.stopper, s.engines, func(ctx context.Context) { + now := s.clock.Now() + if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + return s.WriteLastUpTimestamp(ctx, now) + }); err != nil { + log.Warningf(ctx, "writing last up timestamp: %v", err) + } + livenessOnce.Do(func() { + livenessRecordCreated <- struct{}{} + }) + }) + + if initialStart { + // If we're a new node being added, we're going to wait for the + // liveness record to be created before allowing all RPCs below. + <-livenessRecordCreated + } + + // Allow all RPCs, this server can now be considered to be fully + // initialized. s.grpc.setMode(modeOperational) log.Infof(ctx, "starting %s server at %s (use: %s)", @@ -1552,18 +1576,6 @@ func (s *Server) Start(ctx context.Context) error { log.Event(ctx, "accepting connections") - // Begin the node liveness heartbeat. Add a callback which records the local - // store "last up" timestamp for every store whenever the liveness record is - // updated. - s.nodeLiveness.StartHeartbeat(ctx, s.stopper, s.engines, func(ctx context.Context) { - now := s.clock.Now() - if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { - return s.WriteLastUpTimestamp(ctx, now) - }); err != nil { - log.Warningf(ctx, "writing last up timestamp: %v", err) - } - }) - // Begin recording status summaries. s.node.startWriteNodeStatus(base.DefaultMetricsSampleInterval)