Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.2: *: check node decommissioned/draining state for DistSQL/consistency #66951

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ func (nl *FakeNodeLiveness) IsLive(roachpb.NodeID) (bool, error) {
return false, errors.New("FakeNodeLiveness.IsLive is unimplemented")
}

// IsAvailable is unimplemented.
func (nl *FakeNodeLiveness) IsAvailable(roachpb.NodeID) bool {
panic("not implemented")
}

// IsAvailableNotDraining is unimplemented.
func (nl *FakeNodeLiveness) IsAvailableNotDraining(roachpb.NodeID) bool {
panic("not implemented")
}

// FakeIncrementEpoch increments the epoch for the node with the specified ID.
func (nl *FakeNodeLiveness) FakeIncrementEpoch(id roachpb.NodeID) {
nl.mu.Lock()
Expand Down
15 changes: 6 additions & 9 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type consistencyQueue struct {
type consistencyShouldQueueData struct {
desc *roachpb.RangeDescriptor
getQueueLastProcessed func(ctx context.Context) (hlc.Timestamp, error)
isNodeLive func(nodeID roachpb.NodeID) (bool, error)
isNodeAvailable func(nodeID roachpb.NodeID) bool
disableLastProcessedCheck bool
interval time.Duration
}
Expand Down Expand Up @@ -92,12 +92,12 @@ func (q *consistencyQueue) shouldQueue(
getQueueLastProcessed: func(ctx context.Context) (hlc.Timestamp, error) {
return repl.getQueueLastProcessed(ctx, q.name)
},
isNodeLive: func(nodeID roachpb.NodeID) (bool, error) {
isNodeAvailable: func(nodeID roachpb.NodeID) bool {
if repl.store.cfg.NodeLiveness != nil {
return repl.store.cfg.NodeLiveness.IsLive(nodeID)
return repl.store.cfg.NodeLiveness.IsAvailableNotDraining(nodeID)
}
// Some tests run without a NodeLiveness configured.
return true, nil
return true
},
disableLastProcessedCheck: repl.store.cfg.TestingKnobs.DisableLastProcessedCheck,
interval: q.interval(),
Expand All @@ -123,12 +123,9 @@ func consistencyQueueShouldQueueImpl(
return false, 0
}
}
// Check if all replicas are live.
// Check if all replicas are available.
for _, rep := range data.desc.Replicas().All() {
if live, err := data.isNodeLive(rep.NodeID); err != nil {
log.VErrEventf(ctx, 3, "node %d liveness failed: %s", rep.NodeID, err)
return false, 0
} else if !live {
if !data.isNodeAvailable(rep.NodeID) {
return false, 0
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,20 @@ func TestConsistencyQueueRequiresLive(t *testing.T) {
return testStart, nil
}

isNodeLive := func(nodeID roachpb.NodeID) (bool, error) {
return live, nil
isNodeAvailable := func(nodeID roachpb.NodeID) bool {
return live
}

if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue(
context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeLive,
context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeAvailable,
false, interval); !shouldQ {
t.Fatalf("expected shouldQ true; got %t, %f", shouldQ, priority)
}

live = false

if shouldQ, priority := kvserver.ConsistencyQueueShouldQueue(
context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeLive,
context.Background(), clock.Now(), desc, getQueueLastProcessed, isNodeAvailable,
false, interval); shouldQ {
t.Fatalf("expected shouldQ false; got %t, %f", shouldQ, priority)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func ConsistencyQueueShouldQueue(
now hlc.Timestamp,
desc *roachpb.RangeDescriptor,
getQueueLastProcessed func(ctx context.Context) (hlc.Timestamp, error),
isNodeLive func(nodeID roachpb.NodeID) (bool, error),
isNodeAvailable func(nodeID roachpb.NodeID) bool,
disableLastProcessedCheck bool,
interval time.Duration,
) (bool, float64) {
return consistencyQueueShouldQueueImpl(ctx, now, consistencyShouldQueueData{
desc, getQueueLastProcessed, isNodeLive,
desc, getQueueLastProcessed, isNodeAvailable,
disableLastProcessedCheck, interval})
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,30 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
return liveness.IsLive(nl.clock.Now().GoTime()), nil
}

// IsAvailable returns whether or not the specified node is available to serve
// requests. It checks both the liveness and decommissioned states, but not
// draining or decommissioning (since it may still be a leaseholder for ranges).
// Returns false if the node is not in the local liveness table.
func (nl *NodeLiveness) IsAvailable(nodeID roachpb.NodeID) bool {
liveness, err := nl.GetLiveness(nodeID)
return err == nil && liveness.IsLive(nl.clock.Now().GoTime()) && !liveness.Membership.Decommissioned()
}

// IsAvailableNotDraining returns whether or not the specified node is available
// to serve requests (i.e. it is live and not decommissioned) and is not in the
// process of draining/decommissioning. Note that draining/decommissioning nodes
// could still be leaseholders for ranges until drained, so this should not be
// used when the caller needs to be able to contact leaseholders directly.
// Returns false if the node is not in the local liveness table.
func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
liveness, err := nl.GetLiveness(nodeID)
return err == nil &&
liveness.IsLive(nl.clock.Now().GoTime()) &&
!liveness.Membership.Decommissioning() &&
!liveness.Membership.Decommissioned() &&
!liveness.Draining
}

// StartHeartbeat starts a periodic heartbeat to refresh this node's last
// heartbeat in the node liveness table. The optionally provided
// HeartbeatCallback will be invoked whenever this node updates its own
Expand Down
18 changes: 12 additions & 6 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,20 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
NodeID: cfg.nodeIDContainer,
}

var isLive func(roachpb.NodeID) (bool, error)
if nl, ok := cfg.nodeLiveness.Optional(47900); ok {
isLive = nl.IsLive
var isAvailable func(roachpb.NodeID) bool
nodeLiveness, ok := cfg.nodeLiveness.Optional(47900)
if ok {
// TODO(erikgrinaker): We may want to use IsAvailableNotDraining instead, to
// avoid scheduling long-running flows (e.g. rangefeeds or backups) on nodes
// that are being drained/decommissioned. However, these nodes can still be
// leaseholders, and preventing processor scheduling on them can cause a
// performance cliff for e.g. table reads that then hit the network.
isAvailable = nodeLiveness.IsAvailable
} else {
// We're on a SQL tenant, so this is the only node DistSQL will ever
// schedule on - always returning true is fine.
isLive = func(roachpb.NodeID) (bool, error) {
return true, nil
isAvailable = func(roachpb.NodeID) bool {
return true
}
}

Expand Down Expand Up @@ -486,7 +492,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.nodeDescs,
cfg.gossip,
cfg.stopper,
isLive,
isAvailable,
cfg.nodeDialer,
),

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func startConnExecutor(
nil, /* nodeDescs */
gw,
stopper,
func(roachpb.NodeID) (bool, error) { return true, nil }, // everybody is live
func(roachpb.NodeID) bool { return true }, // everybody is available
nil, /* nodeDialer */
),
QueryCache: querycache.New(0),
Expand Down
28 changes: 10 additions & 18 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func NewDistSQLPlanner(
nodeDescs kvcoord.NodeDescStore,
gw gossip.OptionalGossip,
stopper *stop.Stopper,
isLive func(roachpb.NodeID) (bool, error),
isAvailable func(roachpb.NodeID) bool,
nodeDialer *nodedialer.Dialer,
) *DistSQLPlanner {
dsp := &DistSQLPlanner{
Expand All @@ -145,9 +145,9 @@ func NewDistSQLPlanner(
gossip: gw,
nodeDialer: nodeDialer,
nodeHealth: distSQLNodeHealth{
gossip: gw,
connHealth: nodeDialer.ConnHealth,
isLive: isLive,
gossip: gw,
connHealth: nodeDialer.ConnHealth,
isAvailable: isAvailable,
},
distSender: distSender,
nodeDescs: nodeDescs,
Expand Down Expand Up @@ -732,9 +732,9 @@ type SpanPartition struct {
}

type distSQLNodeHealth struct {
gossip gossip.OptionalGossip
isLive func(roachpb.NodeID) (bool, error)
connHealth func(roachpb.NodeID, rpc.ConnectionClass) error
gossip gossip.OptionalGossip
isAvailable func(roachpb.NodeID) bool
connHealth func(roachpb.NodeID, rpc.ConnectionClass) error
}

func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) error {
Expand All @@ -755,16 +755,8 @@ func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) er
return err
}
}
{
live, err := h.isLive(nodeID)
if err == nil && !live {
err = pgerror.Newf(pgcode.CannotConnectNow,
"node n%d is not live", errors.Safe(nodeID))
}
if err != nil {
return pgerror.Wrapf(err, pgcode.CannotConnectNow,
"not using n%d due to liveness", errors.Safe(nodeID))
}
if !h.isAvailable(nodeID) {
return pgerror.Newf(pgcode.CannotConnectNow, "not using n%d since it is not available", nodeID)
}

// Check that the node is not draining.
Expand All @@ -782,7 +774,7 @@ func (h *distSQLNodeHealth) check(ctx context.Context, nodeID roachpb.NodeID) er
}

if drainingInfo.Draining {
err := errors.Newf("not using n%d because it is draining", log.Safe(nodeID))
err := errors.Newf("not using n%d because it is draining", nodeID)
log.VEventf(ctx, 1, "%v", err)
return err
}
Expand Down
44 changes: 20 additions & 24 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,8 +854,8 @@ func TestPartitionSpans(t *testing.T) {
}
return nil
},
isLive: func(nodeID roachpb.NodeID) (bool, error) {
return true, nil
isAvailable: func(nodeID roachpb.NodeID) bool {
return true
},
},
}
Expand Down Expand Up @@ -1038,8 +1038,8 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
// All the nodes are healthy.
return nil
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
isAvailable: func(roachpb.NodeID) bool {
return true
},
},
}
Expand Down Expand Up @@ -1137,8 +1137,8 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
_, err := mockGossip.GetNodeIDAddress(node)
return err
},
isLive: func(roachpb.NodeID) (bool, error) {
return true, nil
isAvailable: func(roachpb.NodeID) bool {
return true
},
},
}
Expand Down Expand Up @@ -1202,14 +1202,11 @@ func TestCheckNodeHealth(t *testing.T) {
t.Fatal(err)
}

errLive := func(roachpb.NodeID) (bool, error) {
return false, errors.New("injected liveness error")
notAvailable := func(roachpb.NodeID) bool {
return false
}
notLive := func(roachpb.NodeID) (bool, error) {
return false, nil
}
live := func(roachpb.NodeID) (bool, error) {
return true, nil
available := func(roachpb.NodeID) bool {
return true
}

connHealthy := func(roachpb.NodeID, rpc.ConnectionClass) error {
Expand All @@ -1221,21 +1218,20 @@ func TestCheckNodeHealth(t *testing.T) {
_ = connUnhealthy

livenessTests := []struct {
isLive func(roachpb.NodeID) (bool, error)
exp string
isAvailable func(roachpb.NodeID) bool
exp string
}{
{live, ""},
{errLive, "not using n5 due to liveness: injected liveness error"},
{notLive, "not using n5 due to liveness: node n5 is not live"},
{available, ""},
{notAvailable, "not using n5 since it is not available"},
}

gw := gossip.MakeOptionalGossip(mockGossip)
for _, test := range livenessTests {
t.Run("liveness", func(t *testing.T) {
h := distSQLNodeHealth{
gossip: gw,
connHealth: connHealthy,
isLive: test.isLive,
gossip: gw,
connHealth: connHealthy,
isAvailable: test.isAvailable,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
Expand All @@ -1254,9 +1250,9 @@ func TestCheckNodeHealth(t *testing.T) {
for _, test := range connHealthTests {
t.Run("connHealth", func(t *testing.T) {
h := distSQLNodeHealth{
gossip: gw,
connHealth: test.connHealth,
isLive: live,
gossip: gw,
connHealth: test.connHealth,
isAvailable: available,
}
if err := h.check(context.Background(), nodeID); !testutils.IsError(err, test.exp) {
t.Fatalf("expected %v, got %v", test.exp, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/optionalnodeliveness/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
type Interface interface {
Self() (kvserverpb.Liveness, error)
GetLivenesses() []kvserverpb.Liveness
IsAvailable(roachpb.NodeID) bool
IsAvailableNotDraining(roachpb.NodeID) bool
IsLive(roachpb.NodeID) (bool, error)
}

Expand Down