Skip to content

Commit 61a6dbc

Browse files
authored
Add client scheduling eligibility to heartbeat (#14483)
1 parent f2186be commit 61a6dbc

File tree

4 files changed

+23
-4
lines changed

4 files changed

+23
-4
lines changed

.changelog/14483.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
metrics: Update client `node_scheduling_eligibility` value with server heartbeats.
3+
```

client/client.go

+8
Original file line numberDiff line numberDiff line change
@@ -1955,6 +1955,14 @@ func (c *Client) updateNodeStatus() error {
19551955
}
19561956
}
19571957

1958+
// Check heartbeat response for information about the server-side scheduling
1959+
// state of this node
1960+
c.UpdateConfig(func(c *config.Config) {
1961+
if resp.SchedulingEligibility != "" {
1962+
c.Node.SchedulingEligibility = resp.SchedulingEligibility
1963+
}
1964+
})
1965+
19581966
// Update the number of nodes in the cluster so we can adjust our server
19591967
// rebalance rate.
19601968
c.servers.SetNumNodes(resp.NumNodes)

nomad/node_endpoint.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
199199

200200
n.srv.peerLock.RLock()
201201
defer n.srv.peerLock.RUnlock()
202-
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
202+
if err := n.constructNodeServerInfoResponse(args.Node.ID, snap, reply); err != nil {
203203
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
204204
return err
205205
}
@@ -258,7 +258,7 @@ func equalDevices(n1, n2 *structs.Node) bool {
258258
}
259259

260260
// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading.
261-
func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
261+
func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
262262
reply.LeaderRPCAddr = string(n.srv.raft.Leader())
263263

264264
// Reply with config information required for future RPC requests
@@ -271,6 +271,10 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
271271
})
272272
}
273273

274+
// Add ClientStatus information to heartbeat response.
275+
node, _ := snap.NodeByID(nil, nodeID)
276+
reply.SchedulingEligibility = node.SchedulingEligibility
277+
274278
// TODO(sean@): Use an indexed node count instead
275279
//
276280
// Snapshot is used only to iterate over all nodes to create a node
@@ -564,7 +568,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
564568
reply.Index = index
565569
n.srv.peerLock.RLock()
566570
defer n.srv.peerLock.RUnlock()
567-
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
571+
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
568572
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
569573
return err
570574
}
@@ -821,7 +825,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp
821825

822826
n.srv.peerLock.RLock()
823827
defer n.srv.peerLock.RUnlock()
824-
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
828+
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
825829
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
826830
return err
827831
}

nomad/structs/structs.go

+4
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,10 @@ type NodeUpdateResponse struct {
13611361
// region.
13621362
Servers []*NodeServerInfo
13631363

1364+
// SchedulingEligibility is used to inform clients what the server-side
1365+
// has for their scheduling status during heartbeats.
1366+
SchedulingEligibility string
1367+
13641368
QueryMeta
13651369
}
13661370

0 commit comments

Comments
 (0)