From e63cc1b6933cf6b17a25d0d6657f230b5e704251 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 22 Oct 2020 00:10:58 -0400 Subject: [PATCH] [wip] kvserver: have autoupgrade process look at decommission status, not just availability Fixes #53515. We should have the autoupgrade process look at the fully decommissioned bit we added in #50329, instead of just looking at availability. It would avoid the hazard described in #53515. Previously the autoupgrade process was also looking at NodeStatusLiveness, which we've since soured upon (see #50478). Now that we always create a liveness record on start up (#53805), we can simply fetch all liveness records from KV. We add a helper to do this, which we'll also rely on in future PRs for other purposes. It's a bit unfortunate that we're further adding on to the NodeLiveness API without changing the caching structure, but the methods fetching records from KV is the world we're hoping to move towards going forward. Release note: None --- pkg/cmd/roachtest/autoupgrade.go | 132 +++++++++++++++++++++++++++++-- pkg/kv/kvserver/node_liveness.go | 41 +++++++++- pkg/server/autoupgrade.go | 87 ++++++++++---------- pkg/server/status.go | 2 +- 4 files changed, 205 insertions(+), 57 deletions(-) diff --git a/pkg/cmd/roachtest/autoupgrade.go b/pkg/cmd/roachtest/autoupgrade.go index 9ffb596c4772..a28559a698ef 100644 --- a/pkg/cmd/roachtest/autoupgrade.go +++ b/pkg/cmd/roachtest/autoupgrade.go @@ -13,7 +13,9 @@ package main import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/util/version" "runtime" + "strconv" "time" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -21,6 +23,7 @@ import ( "github.com/cockroachdb/errors" ) +// XXX: Replace this test in its entirety. // This test verifies that preserve_downgrade_option is respected and that in the // absence of it the cluster auto-upgrades when this is safe. // @@ -31,7 +34,7 @@ import ( // You want to look at versionupgrade.go, which has a test harness you // can use. func registerAutoUpgrade(r *testRegistry) { - runAutoUpgrade := func(ctx context.Context, t *test, c *cluster, oldVersion string) { + _ = func(ctx context.Context, t *test, c *cluster, oldVersion string) { nodes := c.spec.NodeCount goos := ifLocal(runtime.GOOS, "linux") @@ -158,10 +161,11 @@ func registerAutoUpgrade(r *testRegistry) { t.Fatal(err) } + // XXX: This starts failing. Shouldn't if upgraded, err := checkUpgraded(); err != nil { t.Fatal(err) - } else if upgraded { - t.Fatal("cluster setting version shouldn't be upgraded before all non-decommissioned nodes are alive") + } else if !upgraded { + t.Fatal("cluster setting version should be upgraded before all non-decommissioned nodes are alive") } // Now decommission and stop n3, to test that the auto upgrade happens @@ -258,11 +262,123 @@ func registerAutoUpgrade(r *testRegistry) { MinVersion: "v19.1.0", Cluster: makeClusterSpec(5), Run: func(ctx context.Context, t *test, c *cluster) { - pred, err := PredecessorVersion(r.buildVersion) - if err != nil { - t.Fatal(err) - } - runAutoUpgrade(ctx, t, c, pred) + runAutoUpgrade(ctx, t, c, r.buildVersion) }, }) } + +func runAutoUpgrade( + ctx context.Context, t *test, c *cluster, buildVersion version.Version, +) { + predecessorVersion, err := PredecessorVersion(buildVersion) + if err != nil { + t.Fatal(err) + } + + // h := newAutoupgradeHelper(t, c) + + // An empty string means that the cockroach binary specified by flag + // `cockroach` will be used. + const mainVersion = "" + allNodes := c.All() + u := newVersionUpgradeTest(c, + // We upload both binaries to each node, to be able to vary the binary + // used when issuing `cockroach node` subcommands. + uploadVersion(allNodes, predecessorVersion), + uploadVersion(allNodes, mainVersion), + + startVersion(allNodes, predecessorVersion), + waitForUpgradeStep(allNodes), + checkClusterVersion(1, predecessorVersion), + + // XXX: + sleepStep(10*time.Second), + + // Upgrade all binaries except for one. + binaryUpgradeStep(c.Nodes(2, c.spec.NodeCount), mainVersion), + checkClusterVersion(1, predecessorVersion), + + // Block autoupgrade manually. + preventAutoUpgradeStep(1), + binaryUpgradeStep(c.Node(1), mainVersion), + checkClusterVersion(1, predecessorVersion), + + allowAutoUpgradeStep(1), + checkClusterVersion(1, mainVersion), + ) + + u.run(ctx, t) +} + +type autoupgradeTestHelper struct { + t *test + c *cluster + nodeIDs []int +} + +func newAutoupgradeHelper(t *test, c *cluster) *autoupgradeTestHelper { + var nodeIDs []int + for i := 1; i <= c.spec.NodeCount; i++ { + nodeIDs = append(nodeIDs, i) + } + return &autoupgradeTestHelper{ + t: t, + c: c, + nodeIDs: nodeIDs, + } +} + +// decommission decommissions the given targetNodes, running the process +// through the specified runNode. +func (h *autoupgradeTestHelper) decommission( + ctx context.Context, targetNodes nodeListOption, runNode int, verbs ...string, +) (string, error) { + args := []string{"node", "decommission"} + args = append(args, verbs...) + + if len(targetNodes) == 1 && targetNodes[0] == runNode { + args = append(args, "--self") + } else { + for _, target := range targetNodes { + args = append(args, strconv.Itoa(target)) + } + } + return execCLI(ctx, h.t, h.c, runNode, args...) +} + +// recommission recommissions the given targetNodes, running the process +// through the specified runNode. +func (h *autoupgradeTestHelper) recommission( + ctx context.Context, targetNodes nodeListOption, runNode int, verbs ...string, +) (string, error) { + args := []string{"node", "recommission"} + args = append(args, verbs...) + + if len(targetNodes) == 1 && targetNodes[0] == runNode { + args = append(args, "--self") + } else { + for _, target := range targetNodes { + args = append(args, strconv.Itoa(target)) + } + } + return execCLI(ctx, h.t, h.c, runNode, args...) +} + +func checkClusterVersion(node int, version string) versionStep { + return func(ctx context.Context, t *test, u *versionUpgradeTest) { + db := u.conn(ctx, t, node) + _, err := db.ExecContext(ctx, `SET CLUSTER SETTING cluster.preserve_downgrade_option = $1`, u.binaryVersion(ctx, t, node).String()) + if err != nil { + t.Fatal(err) + } + + var clusterVersion string + if err := db.QueryRowContext(ctx, `SHOW CLUSTER SETTING version`).Scan(&clusterVersion); err != nil { + t.Fatal(err) + } + + if clusterVersion != version { + t.Fatalf("expected cluster version %s, got %s", version, clusterVersion) + } + } +} diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index a75d872b9f5f..5370b32988f1 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -695,7 +695,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions oldLiveness, ok := nl.Self() if !ok { nodeID := nl.gossip.NodeID.Get() - liveness, err := nl.getLivenessFromKV(ctx, nodeID) + liveness, err := nl.GetLivenessFromKV(ctx, nodeID) if err != nil { log.Infof(ctx, "unable to get liveness record from KV: %s", err) continue @@ -1002,6 +1002,39 @@ func (nl *NodeLiveness) GetLivenesses() []kvserverpb.Liveness { return livenesses } +// GetLivenessesFromKV returns a slice containing the liveness status of every +// node on the cluster. It's the improved version of GetLivenesses above, which +// only consults the (possibly stale) in-memory cache. +func (nl *NodeLiveness) GetLivenessesFromKV(ctx context.Context) ([]kvserverpb.Liveness, error) { + kvs, err := nl.db.Scan(ctx, keys.NodeLivenessPrefix, keys.NodeLivenessKeyMax, 0) + if err != nil { + return nil, errors.Wrap(err, "unable to get liveness") + } + + var results []kvserverpb.Liveness + for _, kv := range kvs { + if kv.Value == nil { + return nil, errors.AssertionFailedf("missing liveness record") + } + var liveness kvserverpb.Liveness + if err := kv.Value.GetProto(&liveness); err != nil { + return nil, errors.Wrap(err, "invalid liveness record") + } + + livenessRec := LivenessRecord{ + Liveness: liveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(ctx, livenessRec) + + results = append(results, liveness) + } + + return results, nil +} + // GetLiveness returns the liveness record for the specified nodeID. If the // liveness record is not found (due to gossip propagation delays or due to the // node not existing), we surface that to the caller. The record returned also @@ -1024,9 +1057,9 @@ func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (_ LivenessReco return LivenessRecord{}, false } -// getLivenessFromKV fetches the liveness record from KV for a given node, and +// GetLivenessFromKV fetches the liveness record from KV for a given node, and // updates the internal in-memory cache when doing so. -func (nl *NodeLiveness) getLivenessFromKV( +func (nl *NodeLiveness) GetLivenessFromKV( ctx context.Context, nodeID roachpb.NodeID, ) (kvserverpb.Liveness, error) { livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) @@ -1036,7 +1069,7 @@ func (nl *NodeLiveness) getLivenessFromKV( return livenessRec.Liveness, nil } -// getLivenessRecordFromKV is like getLivenessFromKV, but returns the raw, +// getLivenessRecordFromKV is like GetLivenessFromKV, but returns the raw, // encoded value that the database has for this liveness record in addition to // the decoded liveness proto. func (nl *NodeLiveness) getLivenessRecordFromKV( diff --git a/pkg/server/autoupgrade.go b/pkg/server/autoupgrade.go index 4aac118087e7..e2b80a8b0f90 100644 --- a/pkg/server/autoupgrade.go +++ b/pkg/server/autoupgrade.go @@ -15,10 +15,11 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" @@ -49,10 +50,12 @@ func (s *Server) startAutoUpgrade(ctx context.Context) { // Check if we should upgrade cluster version, keep checking upgrade // status, or stop attempting upgrade. - if quit, err := s.upgradeStatus(ctx); err != nil { + shouldUpgrade, err := s.shouldUpgrade(ctx) + if err != nil { log.Infof(ctx, "failed attempt to upgrade cluster version, error: %s", err) continue - } else if quit { + } + if !shouldUpgrade { log.Info(ctx, "no need to upgrade, cluster already at the newest version") return } @@ -86,56 +89,56 @@ func (s *Server) startAutoUpgrade(ctx context.Context) { } } -// upgradeStatus lets the main checking loop know if we should do upgrade, -// keep checking upgrade status, or stop attempting upgrade. -// Return (true, nil) to indicate we want to stop attempting upgrade. -// Return (false, nil) to indicate we want to do the upgrade. -// Return (false, err) to indicate we want to keep checking upgrade status. -func (s *Server) upgradeStatus(ctx context.Context) (bool, error) { - // Check if all nodes are running at the newest version. - clusterVersion, err := s.clusterVersion(ctx) +// shouldUpgrade lets the autoupgrade loop know whether or not we should +// upgrade, or retry after if if we can't yet tell. +// - (true, nil) indicates that we should upgrade +// - (false, nil) indicates that we shouldn't upgrade +// - ( , err) indicate we don't know one way or another, and should retry +func (s *Server) shouldUpgrade(ctx context.Context) (should bool, _ error) { + livenesses, err := s.nodeLiveness.GetLivenessesFromKV(ctx) if err != nil { return false, err } - nodesWithLiveness, err := s.status.nodesStatusWithLiveness(ctx) + if len(livenesses) == 0 { + return false, errors.AssertionFailedf("didn't find any liveness records") + } + + for _, liveness := range livenesses { + // For nodes that have expired and haven't been decommissioned, stall + // auto-upgrade. + expiration := hlc.Timestamp(liveness.Expiration) + if expiration.Less(s.clock.Now()) && !liveness.Membership.Decommissioned() { + return false, errors.Errorf("node %d not live (since %s), and not decommissioned", liveness.NodeID, expiration.String()) + } + } + + resp, err := s.status.Nodes(ctx, &serverpb.NodesRequest{}) if err != nil { return false, err } - var newVersion string - var notRunningErr error - for nodeID, st := range nodesWithLiveness { - if st.livenessStatus != kvserverpb.NodeLivenessStatus_LIVE && - st.livenessStatus != kvserverpb.NodeLivenessStatus_DECOMMISSIONING { - // We definitely won't be able to upgrade, but defer this error as - // we may find out that we are already at the latest version (the - // cluster may be up to date, but a node is down). - if notRunningErr == nil { - notRunningErr = errors.Errorf("node %d not running (%s), cannot determine version", nodeID, st.livenessStatus) - } - continue - } + if len(resp.Nodes) != len(livenesses) { + return false, errors.AssertionFailedf("mismatched number of node statuses, expected %d got %d", len(livenesses), len(resp.Nodes)) + } - version := st.NodeStatus.Desc.ServerVersion.String() - if newVersion == "" { - newVersion = version - } else if version != newVersion { - return false, errors.Newf("not all nodes are running the latest version yet (saw %s and %s)", newVersion, version) + firstVersion := resp.Nodes[0].Desc.ServerVersion + for _, status := range resp.Nodes[1:] { + // Check version of all nodes. + if status.Desc.ServerVersion != firstVersion { + return false, errors.Newf("mismatched server versions between nodes (saw %s and %s)", firstVersion, status.Desc.ServerVersion) } } - if newVersion == "" { - return false, errors.Errorf("no live nodes found") + // Check if all nodes are running at the newest version. + clusterVersion, err := s.clusterVersion(ctx) + if err != nil { + return false, err } // Check if we really need to upgrade cluster version. - if newVersion == clusterVersion { - return true, nil - } - - if notRunningErr != nil { - return false, notRunningErr + if firstVersion.String() == clusterVersion { + return false, nil } // Check if auto upgrade is enabled at current version. This is read from @@ -151,15 +154,12 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) { } if len(datums) != 0 { - row := datums[0] - downgradeVersion := string(tree.MustBeDString(row[0])) - + downgradeVersion := string(tree.MustBeDString(datums[0][0])) if clusterVersion == downgradeVersion { return false, errors.Errorf("auto upgrade is disabled for current version: %s", clusterVersion) } } - - return false, nil + return true, nil } // clusterVersion returns the current cluster version from the SQL subsystem @@ -179,6 +179,5 @@ func (s *Server) clusterVersion(ctx context.Context) (string, error) { } row := datums[0] clusterVersion := string(tree.MustBeDString(row[0])) - return clusterVersion, nil } diff --git a/pkg/server/status.go b/pkg/server/status.go index 380919cc4cea..bb98b3eacaaa 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1256,7 +1256,7 @@ func (s *statusServer) Profile( // that status "UNKNOWN" has value 0 (the default) when accessing the // map. func (s *statusServer) Nodes( - ctx context.Context, req *serverpb.NodesRequest, + ctx context.Context, _ *serverpb.NodesRequest, ) (*serverpb.NodesResponse, error) { ctx = propagateGatewayMetadata(ctx) ctx = s.AnnotateCtx(ctx)