Skip to content

Commit

Permalink
WIP: update when attrs changed
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Romani <[email protected]>
  • Loading branch information
ffromani committed Jan 22, 2024
1 parent 6f34682 commit 0d6a82b
Show file tree
Hide file tree
Showing 4 changed files with 400 additions and 38 deletions.
95 changes: 71 additions & 24 deletions pkg/noderesourcetopology/cache/overreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
podlisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/nodeconfig"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify"
Expand Down Expand Up @@ -153,33 +155,43 @@ func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod)
klog.V(5).InfoS("nrtcache post release", "logID", klog.KObj(pod), "node", nodeName, "assumedResources", nodeAssumedResources.String())
}

// NodesMaybeOverReserved returns a slice of all the node names which have been discarded previously,
// so which are supposed to be `dirty` in the cache.
// GetNodesOverReservationStatus returns a slice of all the node names which have been discarded previously,
// so which are supposed to be `dirty` in the cache, and a slice with all the other known nodes, which are expected to be clean.
// The union of the returned slices must be the total set of known nodes.
// A node can be discarded for two reasons:
// 1. it legitmately cannot fit containers because it has not enough free resources
// 2. it was pessimistically overallocated, so the node is a candidate for resync
// This function enables the caller to know the slice of nodes should be considered for resync,
// avoiding the need to rescan the full node list.
func (ov *OverReserve) NodesMaybeOverReserved(logID string) []string {
func (ov *OverReserve) GetNodesOverReservationStatus(logID string) ([]string, []string) {
ov.lock.Lock()
defer ov.lock.Unlock()

allNodes := sets.New[string](ov.nrts.NodeNames()...)
// this is intentionally aggressive. We don't yet make any attempt to find out if the
// node was discarded because pessimistically overrserved (which should indeed trigger
// a resync) or if it was discarded because the actual resources on the node really were
// exhausted. We do like this because this is the safest approach. We will optimize
// the node selection logic later on to make the resync procedure less aggressive but
// still correct.
nodes := ov.nodesWithForeignPods.Clone()
foreignCount := nodes.Len()
dirtyNodes := ov.nodesWithForeignPods.Clone()
foreignCount := dirtyNodes.Len()

for _, node := range ov.nodesMaybeOverreserved.Keys() {
nodes.Incr(node)
dirtyNodes.Incr(node)
}

dirtyList := dirtyNodes.Keys()
if len(dirtyList) > 0 {
klog.V(4).InfoS("nrtcache: found dirty nodes", "logID", logID, "foreign", foreignCount, "discarded", len(dirtyList)-foreignCount, "total", len(dirtyList), "allNodes", len(allNodes))
}

if nodes.Len() > 0 {
klog.V(4).InfoS("nrtcache: found dirty nodes", "logID", logID, "foreign", foreignCount, "discarded", nodes.Len()-foreignCount, "total", nodes.Len())
cleanList := allNodes.Delete(dirtyList...).UnsortedList()
if len(cleanList) > 0 {
klog.V(4).InfoS("nrtcache: found clean nodes", "logID", logID, "dirtyCount", len(dirtyList), "total", len(cleanList), "allNodes", len(allNodes))
}
return nodes.Keys()

return dirtyList, cleanList
}

// Resync implements the cache resync loop step. This function checks if the latest available NRT information received matches the
Expand All @@ -194,24 +206,27 @@ func (ov *OverReserve) Resync() {
// we are not working with a specific pod, so we need a unique key to track this flow
logID := logIDFromTime()

nodeNames := ov.NodesMaybeOverReserved(logID)
// avoid as much as we can unnecessary work and logs.
if len(nodeNames) == 0 {
klog.V(6).InfoS("nrtcache: resync: no dirty nodes detected")
return
}
klog.V(6).InfoS("nrtcache: resync NodeTopology cache starting", "logID", logID)
defer klog.V(6).InfoS("nrtcache: resync NodeTopology cache complete", "logID", logID)

dirtyNodes, cleanNodes := ov.GetNodesOverReservationStatus(logID)
dirtyUpdates := ov.computeResyncByPods(logID, dirtyNodes)
cleanUpdates := ov.makeResyncMap(logID, cleanNodes)

ov.FlushNodes(logID, dirtyUpdates, cleanUpdates)
}

// computeResyncByPods must not require additiona locking for the `OverReserve` data structures
func (ov *OverReserve) computeResyncByPods(logID string, nodeNames []string) map[string]*topologyv1alpha2.NodeResourceTopology {
nrtUpdatesMap := make(map[string]*topologyv1alpha2.NodeResourceTopology)

// node -> pod identifier (namespace, name)
nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, ov.isPodRelevant, logID)
if err != nil {
klog.ErrorS(err, "cannot find the mapping between running pods and nodes")
return
return nrtUpdatesMap
}

klog.V(6).InfoS("nrtcache: resync NodeTopology cache starting", "logID", logID)
defer klog.V(6).InfoS("nrtcache: resync NodeTopology cache complete", "logID", logID)

var nrtUpdates []*topologyv1alpha2.NodeResourceTopology
for _, nodeName := range nodeNames {
nrtCandidate := &topologyv1alpha2.NodeResourceTopology{}
if err := ov.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, nrtCandidate); err != nil {
Expand Down Expand Up @@ -251,23 +266,55 @@ func (ov *OverReserve) Resync() {
}

klog.V(4).InfoS("nrtcache: overriding cached info", "logID", logID, "node", nodeName)
nrtUpdates = append(nrtUpdates, nrtCandidate)
nrtUpdatesMap[nodeName] = nrtCandidate
}

return nrtUpdatesMap
}

func (ov *OverReserve) makeResyncMap(logID string, nodeNames []string) map[string]*topologyv1alpha2.NodeResourceTopology {
nrtUpdatesMap := make(map[string]*topologyv1alpha2.NodeResourceTopology)

for _, nodeName := range nodeNames {
nrtCandidate := &topologyv1alpha2.NodeResourceTopology{}
err := ov.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, nrtCandidate)
if err != nil {
klog.V(3).InfoS("nrtcache: failed to get NodeTopology", "logID", logID, "node", nodeName, "error", err)
continue
}
if nrtCandidate == nil {
klog.V(3).InfoS("nrtcache: missing NodeTopology", "logID", logID, "node", nodeName)
continue
}

klog.V(4).InfoS("nrtcache: overriding cached info", "logID", logID, "node", nodeName)
nrtUpdatesMap[nodeName] = nrtCandidate
}

ov.FlushNodes(logID, nrtUpdates...)
return nrtUpdatesMap
}

// FlushNodes drops all the cached information about a given node, resetting its state clean.
func (ov *OverReserve) FlushNodes(logID string, nrts ...*topologyv1alpha2.NodeResourceTopology) {
func (ov *OverReserve) FlushNodes(logID string, dirtyUpdates, cleanUpdates map[string]*topologyv1alpha2.NodeResourceTopology) {
ov.lock.Lock()
defer ov.lock.Unlock()
for _, nrt := range nrts {
for _, nrt := range dirtyUpdates {
klog.V(4).InfoS("nrtcache: flushing", "logID", logID, "node", nrt.Name)
ov.nrts.Update(nrt)
delete(ov.assumedResources, nrt.Name)
ov.nodesMaybeOverreserved.Delete(nrt.Name)
ov.nodesWithForeignPods.Delete(nrt.Name)
}
for _, nrt := range cleanUpdates {
klog.V(4).InfoS("nrtcache: refreshing", "logID", logID, "node", nrt.Name)
ov.nrts.UpdateIfNeeded(nrt, areAttrsChanged)
}
}

func areAttrsChanged(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool {
oldConf := nodeconfig.TopologyManagerFromNodeResourceTopology(oldNrt)
newConf := nodeconfig.TopologyManagerFromNodeResourceTopology(newNrt)
return !oldConf.Equal(newConf)
}

// to be used only in tests
Expand Down
29 changes: 16 additions & 13 deletions pkg/noderesourcetopology/cache/overreserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,19 @@ func TestInitEmptyLister(t *testing.T) {
}
}

func TestNodesMaybeOverReservedCount(t *testing.T) {
func TestGetNodesOverReservationStatusCount(t *testing.T) {
fakeClient, err := tu.NewFakeClient()
if err != nil {
t.Fatal(err)
}

fakePodLister := &fakePodLister{}

nrtCache := mustOverReserve(t, fakeClient, fakePodLister)
dirtyNodes := nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing")
if len(dirtyNodes) != 0 {
t.Errorf("dirty nodes from pristine cache: %v", dirtyNodes)
}
// TODO: validate cleanNodes
}

func TestDirtyNodesMarkDiscarded(t *testing.T) {
Expand All @@ -149,7 +149,7 @@ func TestDirtyNodesMarkDiscarded(t *testing.T) {
nrtCache.ReserveNodeResources(nodeName, &corev1.Pod{})
}

dirtyNodes := nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing")
if len(dirtyNodes) != 0 {
t.Errorf("dirty nodes from pristine cache: %v", dirtyNodes)
}
Expand All @@ -158,7 +158,7 @@ func TestDirtyNodesMarkDiscarded(t *testing.T) {
nrtCache.NodeMaybeOverReserved(nodeName, &corev1.Pod{})
}

dirtyNodes = nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ = nrtCache.GetNodesOverReservationStatus("testing")
sort.Strings(dirtyNodes)

if !reflect.DeepEqual(dirtyNodes, expectedNodes) {
Expand All @@ -185,7 +185,7 @@ func TestDirtyNodesUnmarkedOnReserve(t *testing.T) {
nrtCache.ReserveNodeResources(nodeName, &corev1.Pod{})
}

dirtyNodes := nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing")
if len(dirtyNodes) != 0 {
t.Errorf("dirty nodes from pristine cache: %v", dirtyNodes)
}
Expand All @@ -201,7 +201,7 @@ func TestDirtyNodesUnmarkedOnReserve(t *testing.T) {
"node-1",
}

dirtyNodes = nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ = nrtCache.GetNodesOverReservationStatus("testing")

if !reflect.DeepEqual(dirtyNodes, expectedNodes) {
t.Errorf("got=%v expected=%v", dirtyNodes, expectedNodes)
Expand Down Expand Up @@ -433,9 +433,12 @@ func TestFlush(t *testing.T) {
},
}

nrtCache.FlushNodes(logID, expectedNodeTopology.DeepCopy())
updatesMap := map[string]*topologyv1alpha2.NodeResourceTopology{
expectedNodeTopology.Name: expectedNodeTopology.DeepCopy(),
}
nrtCache.FlushNodes(logID, updatesMap, nil)

dirtyNodes := nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing")
if len(dirtyNodes) != 0 {
t.Errorf("dirty nodes after flush: %v", dirtyNodes)
}
Expand Down Expand Up @@ -517,7 +520,7 @@ func TestResyncNoPodFingerprint(t *testing.T) {

nrtCache.Resync()

dirtyNodes := nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing")

if len(dirtyNodes) != 1 || dirtyNodes[0] != "node1" {
t.Errorf("cleaned nodes after resyncing with bad data: %v", dirtyNodes)
Expand Down Expand Up @@ -611,7 +614,7 @@ func TestResyncMatchFingerprint(t *testing.T) {

nrtCache.Resync()

dirtyNodes := nrtCache.NodesMaybeOverReserved("testing")
dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing")
if len(dirtyNodes) > 0 {
t.Errorf("node still dirty after resyncing with good data: %v", dirtyNodes)
}
Expand Down Expand Up @@ -641,7 +644,7 @@ func TestUnknownNodeWithForeignPods(t *testing.T) {

nrtCache.NodeHasForeignPods("node-bogus", &corev1.Pod{})

names := nrtCache.NodesMaybeOverReserved("testing")
names, _ := nrtCache.GetNodesOverReservationStatus("testing")
if len(names) != 0 {
t.Errorf("non-existent node has foreign pods!")
}
Expand Down Expand Up @@ -714,7 +717,7 @@ func TestNodeWithForeignPods(t *testing.T) {
target := "node2"
nrtCache.NodeHasForeignPods(target, &corev1.Pod{})

names := nrtCache.NodesMaybeOverReserved("testing")
names, _ := nrtCache.GetNodesOverReservationStatus("testing")
if len(names) != 1 || names[0] != target {
t.Errorf("unexpected dirty nodes: %v", names)
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/noderesourcetopology/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ func (nrs *nrtStore) Update(nrt *topologyv1alpha2.NodeResourceTopology) {
klog.V(5).InfoS("nrtcache: updated cached NodeTopology", "node", nrt.Name)
}

func (nrs *nrtStore) UpdateIfNeeded(nrt *topologyv1alpha2.NodeResourceTopology, isNeeded func(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool) {
cur, ok := nrs.data[nrt.Name]
// if we do NOT have previous data, we surely need an update.
if ok && !isNeeded(cur, nrt) {
return
}
nrs.Update(nrt)
}

func neverNeeded(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool {
return false
}

func alwaysNeeded(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool {
return true
}

// NodeNames return the names of all the NRTs present in the store with no ordering guarantee
func (nrs *nrtStore) NodeNames() []string {
nodeNames := make([]string, 0, len(nrs.data))
for name := range nrs.data {
nodeNames = append(nodeNames, name)
}
return nodeNames
}

// resourceStore maps the resource requested by pod by pod namespaed name. It is not thread safe and needs to be protected by a lock.
type resourceStore struct {
// key: namespace + "/" name
Expand Down
Loading

0 comments on commit 0d6a82b

Please sign in to comment.