Skip to content

Commit

Permalink
[wip] kvserver: have autoupgrade process look at decommission status,…
Browse files Browse the repository at this point in the history
… not just availability

Fixes cockroachdb#53515.

We should have the autoupgrade process look at the fully decommissioned
bit we added in cockroachdb#50329, instead of just looking at availability. It
would avoid the hazard described in cockroachdb#53515.

Previously the autoupgrade process was also looking at
NodeStatusLiveness, which we've since soured upon (see cockroachdb#50478). Now that
we always create a liveness record on start up (cockroachdb#53805), we can simply
fetch all liveness records from KV. We add a helper to do this, which
we'll also rely on in future PRs for other purposes. It's a bit
unfortunate that we're further adding on to the NodeLiveness API without
changing the caching structure, but the methods fetching records from KV
is the world we're hoping to move towards going forward.

Release note: None
  • Loading branch information
irfansharif committed Oct 22, 2020
1 parent c3a391b commit e63cc1b
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 57 deletions.
132 changes: 124 additions & 8 deletions pkg/cmd/roachtest/autoupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ package main
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/util/version"
"runtime"
"strconv"
"time"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/binfetcher"
"github.com/cockroachdb/errors"
)

// XXX: Replace this test in its entirety.
// This test verifies that preserve_downgrade_option is respected and that in the
// absence of it the cluster auto-upgrades when this is safe.
//
Expand All @@ -31,7 +34,7 @@ import (
// You want to look at versionupgrade.go, which has a test harness you
// can use.
func registerAutoUpgrade(r *testRegistry) {
runAutoUpgrade := func(ctx context.Context, t *test, c *cluster, oldVersion string) {
_ = func(ctx context.Context, t *test, c *cluster, oldVersion string) {
nodes := c.spec.NodeCount
goos := ifLocal(runtime.GOOS, "linux")

Expand Down Expand Up @@ -158,10 +161,11 @@ func registerAutoUpgrade(r *testRegistry) {
t.Fatal(err)
}

// XXX: This starts failing. Shouldn't
if upgraded, err := checkUpgraded(); err != nil {
t.Fatal(err)
} else if upgraded {
t.Fatal("cluster setting version shouldn't be upgraded before all non-decommissioned nodes are alive")
} else if !upgraded {
t.Fatal("cluster setting version should be upgraded before all non-decommissioned nodes are alive")
}

// Now decommission and stop n3, to test that the auto upgrade happens
Expand Down Expand Up @@ -258,11 +262,123 @@ func registerAutoUpgrade(r *testRegistry) {
MinVersion: "v19.1.0",
Cluster: makeClusterSpec(5),
Run: func(ctx context.Context, t *test, c *cluster) {
pred, err := PredecessorVersion(r.buildVersion)
if err != nil {
t.Fatal(err)
}
runAutoUpgrade(ctx, t, c, pred)
runAutoUpgrade(ctx, t, c, r.buildVersion)
},
})
}

func runAutoUpgrade(
ctx context.Context, t *test, c *cluster, buildVersion version.Version,
) {
predecessorVersion, err := PredecessorVersion(buildVersion)
if err != nil {
t.Fatal(err)
}

// h := newAutoupgradeHelper(t, c)

// An empty string means that the cockroach binary specified by flag
// `cockroach` will be used.
const mainVersion = ""
allNodes := c.All()
u := newVersionUpgradeTest(c,
// We upload both binaries to each node, to be able to vary the binary
// used when issuing `cockroach node` subcommands.
uploadVersion(allNodes, predecessorVersion),
uploadVersion(allNodes, mainVersion),

startVersion(allNodes, predecessorVersion),
waitForUpgradeStep(allNodes),
checkClusterVersion(1, predecessorVersion),

// XXX:
sleepStep(10*time.Second),

// Upgrade all binaries except for one.
binaryUpgradeStep(c.Nodes(2, c.spec.NodeCount), mainVersion),
checkClusterVersion(1, predecessorVersion),

// Block autoupgrade manually.
preventAutoUpgradeStep(1),
binaryUpgradeStep(c.Node(1), mainVersion),
checkClusterVersion(1, predecessorVersion),

allowAutoUpgradeStep(1),
checkClusterVersion(1, mainVersion),
)

u.run(ctx, t)
}

type autoupgradeTestHelper struct {
t *test
c *cluster
nodeIDs []int
}

func newAutoupgradeHelper(t *test, c *cluster) *autoupgradeTestHelper {
var nodeIDs []int
for i := 1; i <= c.spec.NodeCount; i++ {
nodeIDs = append(nodeIDs, i)
}
return &autoupgradeTestHelper{
t: t,
c: c,
nodeIDs: nodeIDs,
}
}

// decommission decommissions the given targetNodes, running the process
// through the specified runNode.
func (h *autoupgradeTestHelper) decommission(
ctx context.Context, targetNodes nodeListOption, runNode int, verbs ...string,
) (string, error) {
args := []string{"node", "decommission"}
args = append(args, verbs...)

if len(targetNodes) == 1 && targetNodes[0] == runNode {
args = append(args, "--self")
} else {
for _, target := range targetNodes {
args = append(args, strconv.Itoa(target))
}
}
return execCLI(ctx, h.t, h.c, runNode, args...)
}

// recommission recommissions the given targetNodes, running the process
// through the specified runNode.
func (h *autoupgradeTestHelper) recommission(
ctx context.Context, targetNodes nodeListOption, runNode int, verbs ...string,
) (string, error) {
args := []string{"node", "recommission"}
args = append(args, verbs...)

if len(targetNodes) == 1 && targetNodes[0] == runNode {
args = append(args, "--self")
} else {
for _, target := range targetNodes {
args = append(args, strconv.Itoa(target))
}
}
return execCLI(ctx, h.t, h.c, runNode, args...)
}

func checkClusterVersion(node int, version string) versionStep {
return func(ctx context.Context, t *test, u *versionUpgradeTest) {
db := u.conn(ctx, t, node)
_, err := db.ExecContext(ctx, `SET CLUSTER SETTING cluster.preserve_downgrade_option = $1`, u.binaryVersion(ctx, t, node).String())
if err != nil {
t.Fatal(err)
}

var clusterVersion string
if err := db.QueryRowContext(ctx, `SHOW CLUSTER SETTING version`).Scan(&clusterVersion); err != nil {
t.Fatal(err)
}

if clusterVersion != version {
t.Fatalf("expected cluster version %s, got %s", version, clusterVersion)
}
}
}
41 changes: 37 additions & 4 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions
oldLiveness, ok := nl.Self()
if !ok {
nodeID := nl.gossip.NodeID.Get()
liveness, err := nl.getLivenessFromKV(ctx, nodeID)
liveness, err := nl.GetLivenessFromKV(ctx, nodeID)
if err != nil {
log.Infof(ctx, "unable to get liveness record from KV: %s", err)
continue
Expand Down Expand Up @@ -1002,6 +1002,39 @@ func (nl *NodeLiveness) GetLivenesses() []kvserverpb.Liveness {
return livenesses
}

// GetLivenessesFromKV returns a slice containing the liveness status of every
// node on the cluster. It's the improved version of GetLivenesses above, which
// only consults the (possibly stale) in-memory cache.
func (nl *NodeLiveness) GetLivenessesFromKV(ctx context.Context) ([]kvserverpb.Liveness, error) {
kvs, err := nl.db.Scan(ctx, keys.NodeLivenessPrefix, keys.NodeLivenessKeyMax, 0)
if err != nil {
return nil, errors.Wrap(err, "unable to get liveness")
}

var results []kvserverpb.Liveness
for _, kv := range kvs {
if kv.Value == nil {
return nil, errors.AssertionFailedf("missing liveness record")
}
var liveness kvserverpb.Liveness
if err := kv.Value.GetProto(&liveness); err != nil {
return nil, errors.Wrap(err, "invalid liveness record")
}

livenessRec := LivenessRecord{
Liveness: liveness,
raw: kv.Value.TagAndDataBytes(),
}

// Update our cache with the liveness record we just found.
nl.maybeUpdate(ctx, livenessRec)

results = append(results, liveness)
}

return results, nil
}

// GetLiveness returns the liveness record for the specified nodeID. If the
// liveness record is not found (due to gossip propagation delays or due to the
// node not existing), we surface that to the caller. The record returned also
Expand All @@ -1024,9 +1057,9 @@ func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (_ LivenessReco
return LivenessRecord{}, false
}

// getLivenessFromKV fetches the liveness record from KV for a given node, and
// GetLivenessFromKV fetches the liveness record from KV for a given node, and
// updates the internal in-memory cache when doing so.
func (nl *NodeLiveness) getLivenessFromKV(
func (nl *NodeLiveness) GetLivenessFromKV(
ctx context.Context, nodeID roachpb.NodeID,
) (kvserverpb.Liveness, error) {
livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID)
Expand All @@ -1036,7 +1069,7 @@ func (nl *NodeLiveness) getLivenessFromKV(
return livenessRec.Liveness, nil
}

// getLivenessRecordFromKV is like getLivenessFromKV, but returns the raw,
// getLivenessRecordFromKV is like GetLivenessFromKV, but returns the raw,
// encoded value that the database has for this liveness record in addition to
// the decoded liveness proto.
func (nl *NodeLiveness) getLivenessRecordFromKV(
Expand Down
87 changes: 43 additions & 44 deletions pkg/server/autoupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -49,10 +50,12 @@ func (s *Server) startAutoUpgrade(ctx context.Context) {

// Check if we should upgrade cluster version, keep checking upgrade
// status, or stop attempting upgrade.
if quit, err := s.upgradeStatus(ctx); err != nil {
shouldUpgrade, err := s.shouldUpgrade(ctx)
if err != nil {
log.Infof(ctx, "failed attempt to upgrade cluster version, error: %s", err)
continue
} else if quit {
}
if !shouldUpgrade {
log.Info(ctx, "no need to upgrade, cluster already at the newest version")
return
}
Expand Down Expand Up @@ -86,56 +89,56 @@ func (s *Server) startAutoUpgrade(ctx context.Context) {
}
}

// upgradeStatus lets the main checking loop know if we should do upgrade,
// keep checking upgrade status, or stop attempting upgrade.
// Return (true, nil) to indicate we want to stop attempting upgrade.
// Return (false, nil) to indicate we want to do the upgrade.
// Return (false, err) to indicate we want to keep checking upgrade status.
func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
// Check if all nodes are running at the newest version.
clusterVersion, err := s.clusterVersion(ctx)
// shouldUpgrade lets the autoupgrade loop know whether or not we should
// upgrade, or retry after if if we can't yet tell.
// - (true, nil) indicates that we should upgrade
// - (false, nil) indicates that we shouldn't upgrade
// - ( , err) indicate we don't know one way or another, and should retry
func (s *Server) shouldUpgrade(ctx context.Context) (should bool, _ error) {
livenesses, err := s.nodeLiveness.GetLivenessesFromKV(ctx)
if err != nil {
return false, err
}

nodesWithLiveness, err := s.status.nodesStatusWithLiveness(ctx)
if len(livenesses) == 0 {
return false, errors.AssertionFailedf("didn't find any liveness records")
}

for _, liveness := range livenesses {
// For nodes that have expired and haven't been decommissioned, stall
// auto-upgrade.
expiration := hlc.Timestamp(liveness.Expiration)
if expiration.Less(s.clock.Now()) && !liveness.Membership.Decommissioned() {
return false, errors.Errorf("node %d not live (since %s), and not decommissioned", liveness.NodeID, expiration.String())
}
}

resp, err := s.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return false, err
}

var newVersion string
var notRunningErr error
for nodeID, st := range nodesWithLiveness {
if st.livenessStatus != kvserverpb.NodeLivenessStatus_LIVE &&
st.livenessStatus != kvserverpb.NodeLivenessStatus_DECOMMISSIONING {
// We definitely won't be able to upgrade, but defer this error as
// we may find out that we are already at the latest version (the
// cluster may be up to date, but a node is down).
if notRunningErr == nil {
notRunningErr = errors.Errorf("node %d not running (%s), cannot determine version", nodeID, st.livenessStatus)
}
continue
}
if len(resp.Nodes) != len(livenesses) {
return false, errors.AssertionFailedf("mismatched number of node statuses, expected %d got %d", len(livenesses), len(resp.Nodes))
}

version := st.NodeStatus.Desc.ServerVersion.String()
if newVersion == "" {
newVersion = version
} else if version != newVersion {
return false, errors.Newf("not all nodes are running the latest version yet (saw %s and %s)", newVersion, version)
firstVersion := resp.Nodes[0].Desc.ServerVersion
for _, status := range resp.Nodes[1:] {
// Check version of all nodes.
if status.Desc.ServerVersion != firstVersion {
return false, errors.Newf("mismatched server versions between nodes (saw %s and %s)", firstVersion, status.Desc.ServerVersion)
}
}

if newVersion == "" {
return false, errors.Errorf("no live nodes found")
// Check if all nodes are running at the newest version.
clusterVersion, err := s.clusterVersion(ctx)
if err != nil {
return false, err
}

// Check if we really need to upgrade cluster version.
if newVersion == clusterVersion {
return true, nil
}

if notRunningErr != nil {
return false, notRunningErr
if firstVersion.String() == clusterVersion {
return false, nil
}

// Check if auto upgrade is enabled at current version. This is read from
Expand All @@ -151,15 +154,12 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
}

if len(datums) != 0 {
row := datums[0]
downgradeVersion := string(tree.MustBeDString(row[0]))

downgradeVersion := string(tree.MustBeDString(datums[0][0]))
if clusterVersion == downgradeVersion {
return false, errors.Errorf("auto upgrade is disabled for current version: %s", clusterVersion)
}
}

return false, nil
return true, nil
}

// clusterVersion returns the current cluster version from the SQL subsystem
Expand All @@ -179,6 +179,5 @@ func (s *Server) clusterVersion(ctx context.Context) (string, error) {
}
row := datums[0]
clusterVersion := string(tree.MustBeDString(row[0]))

return clusterVersion, nil
}
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ func (s *statusServer) Profile(
// that status "UNKNOWN" has value 0 (the default) when accessing the
// map.
func (s *statusServer) Nodes(
ctx context.Context, req *serverpb.NodesRequest,
ctx context.Context, _ *serverpb.NodesRequest,
) (*serverpb.NodesResponse, error) {
ctx = propagateGatewayMetadata(ctx)
ctx = s.AnnotateCtx(ctx)
Expand Down

0 comments on commit e63cc1b

Please sign in to comment.