diff --git a/pkg/noderesourcetopology/cache/overreserve.go b/pkg/noderesourcetopology/cache/overreserve.go index 3c373e9f2..b81386fd3 100644 --- a/pkg/noderesourcetopology/cache/overreserve.go +++ b/pkg/noderesourcetopology/cache/overreserve.go @@ -29,12 +29,14 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" podlisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/nodeconfig" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" @@ -153,33 +155,43 @@ func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod) klog.V(5).InfoS("nrtcache post release", "logID", klog.KObj(pod), "node", nodeName, "assumedResources", nodeAssumedResources.String()) } -// NodesMaybeOverReserved returns a slice of all the node names which have been discarded previously, -// so which are supposed to be `dirty` in the cache. +// GetNodesOverReservationStatus returns a slice of all the node names which have been discarded previously, +// so which are supposed to be `dirty` in the cache, and a slice with all the other known nodes, which are expected to be clean. +// The union of the returned slices must be the total set of known nodes. // A node can be discarded for two reasons: // 1. it legitmately cannot fit containers because it has not enough free resources // 2. it was pessimistically overallocated, so the node is a candidate for resync // This function enables the caller to know the slice of nodes should be considered for resync, // avoiding the need to rescan the full node list. -func (ov *OverReserve) NodesMaybeOverReserved(logID string) []string { +func (ov *OverReserve) GetNodesOverReservationStatus(logID string) ([]string, []string) { ov.lock.Lock() defer ov.lock.Unlock() + + allNodes := sets.New[string](ov.nrts.NodeNames()...) // this is intentionally aggressive. We don't yet make any attempt to find out if the // node was discarded because pessimistically overrserved (which should indeed trigger // a resync) or if it was discarded because the actual resources on the node really were // exhausted. We do like this because this is the safest approach. We will optimize // the node selection logic later on to make the resync procedure less aggressive but // still correct. - nodes := ov.nodesWithForeignPods.Clone() - foreignCount := nodes.Len() + dirtyNodes := ov.nodesWithForeignPods.Clone() + foreignCount := dirtyNodes.Len() for _, node := range ov.nodesMaybeOverreserved.Keys() { - nodes.Incr(node) + dirtyNodes.Incr(node) + } + + dirtyList := dirtyNodes.Keys() + if len(dirtyList) > 0 { + klog.V(4).InfoS("nrtcache: found dirty nodes", "logID", logID, "foreign", foreignCount, "discarded", len(dirtyList)-foreignCount, "total", len(dirtyList), "allNodes", len(allNodes)) } - if nodes.Len() > 0 { - klog.V(4).InfoS("nrtcache: found dirty nodes", "logID", logID, "foreign", foreignCount, "discarded", nodes.Len()-foreignCount, "total", nodes.Len()) + cleanList := allNodes.Delete(dirtyList...).UnsortedList() + if len(cleanList) > 0 { + klog.V(4).InfoS("nrtcache: found clean nodes", "logID", logID, "dirtyCount", len(dirtyList), "total", len(cleanList), "allNodes", len(allNodes)) } - return nodes.Keys() + + return dirtyList, cleanList } // Resync implements the cache resync loop step. This function checks if the latest available NRT information received matches the @@ -194,24 +206,27 @@ func (ov *OverReserve) Resync() { // we are not working with a specific pod, so we need a unique key to track this flow logID := logIDFromTime() - nodeNames := ov.NodesMaybeOverReserved(logID) - // avoid as much as we can unnecessary work and logs. - if len(nodeNames) == 0 { - klog.V(6).InfoS("nrtcache: resync: no dirty nodes detected") - return - } + klog.V(6).InfoS("nrtcache: resync NodeTopology cache starting", "logID", logID) + defer klog.V(6).InfoS("nrtcache: resync NodeTopology cache complete", "logID", logID) + + dirtyNodes, cleanNodes := ov.GetNodesOverReservationStatus(logID) + dirtyUpdates := ov.computeResyncByPods(logID, dirtyNodes) + cleanUpdates := ov.makeResyncMap(logID, cleanNodes) + + ov.FlushNodes(logID, dirtyUpdates, cleanUpdates) +} + +// computeResyncByPods must not require additiona locking for the `OverReserve` data structures +func (ov *OverReserve) computeResyncByPods(logID string, nodeNames []string) map[string]*topologyv1alpha2.NodeResourceTopology { + nrtUpdatesMap := make(map[string]*topologyv1alpha2.NodeResourceTopology) // node -> pod identifier (namespace, name) nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, ov.isPodRelevant, logID) if err != nil { klog.ErrorS(err, "cannot find the mapping between running pods and nodes") - return + return nrtUpdatesMap } - klog.V(6).InfoS("nrtcache: resync NodeTopology cache starting", "logID", logID) - defer klog.V(6).InfoS("nrtcache: resync NodeTopology cache complete", "logID", logID) - - var nrtUpdates []*topologyv1alpha2.NodeResourceTopology for _, nodeName := range nodeNames { nrtCandidate := &topologyv1alpha2.NodeResourceTopology{} if err := ov.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, nrtCandidate); err != nil { @@ -251,23 +266,55 @@ func (ov *OverReserve) Resync() { } klog.V(4).InfoS("nrtcache: overriding cached info", "logID", logID, "node", nodeName) - nrtUpdates = append(nrtUpdates, nrtCandidate) + nrtUpdatesMap[nodeName] = nrtCandidate + } + + return nrtUpdatesMap +} + +func (ov *OverReserve) makeResyncMap(logID string, nodeNames []string) map[string]*topologyv1alpha2.NodeResourceTopology { + nrtUpdatesMap := make(map[string]*topologyv1alpha2.NodeResourceTopology) + + for _, nodeName := range nodeNames { + nrtCandidate := &topologyv1alpha2.NodeResourceTopology{} + err := ov.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, nrtCandidate) + if err != nil { + klog.V(3).InfoS("nrtcache: failed to get NodeTopology", "logID", logID, "node", nodeName, "error", err) + continue + } + if nrtCandidate == nil { + klog.V(3).InfoS("nrtcache: missing NodeTopology", "logID", logID, "node", nodeName) + continue + } + + klog.V(4).InfoS("nrtcache: overriding cached info", "logID", logID, "node", nodeName) + nrtUpdatesMap[nodeName] = nrtCandidate } - ov.FlushNodes(logID, nrtUpdates...) + return nrtUpdatesMap } // FlushNodes drops all the cached information about a given node, resetting its state clean. -func (ov *OverReserve) FlushNodes(logID string, nrts ...*topologyv1alpha2.NodeResourceTopology) { +func (ov *OverReserve) FlushNodes(logID string, dirtyUpdates, cleanUpdates map[string]*topologyv1alpha2.NodeResourceTopology) { ov.lock.Lock() defer ov.lock.Unlock() - for _, nrt := range nrts { + for _, nrt := range dirtyUpdates { klog.V(4).InfoS("nrtcache: flushing", "logID", logID, "node", nrt.Name) ov.nrts.Update(nrt) delete(ov.assumedResources, nrt.Name) ov.nodesMaybeOverreserved.Delete(nrt.Name) ov.nodesWithForeignPods.Delete(nrt.Name) } + for _, nrt := range cleanUpdates { + klog.V(4).InfoS("nrtcache: refreshing", "logID", logID, "node", nrt.Name) + ov.nrts.UpdateIfNeeded(nrt, areAttrsChanged) + } +} + +func areAttrsChanged(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool { + oldConf := nodeconfig.TopologyManagerFromNodeResourceTopology(oldNrt) + newConf := nodeconfig.TopologyManagerFromNodeResourceTopology(newNrt) + return !oldConf.Equal(newConf) } // to be used only in tests diff --git a/pkg/noderesourcetopology/cache/overreserve_test.go b/pkg/noderesourcetopology/cache/overreserve_test.go index ec48e2bae..689d5e115 100644 --- a/pkg/noderesourcetopology/cache/overreserve_test.go +++ b/pkg/noderesourcetopology/cache/overreserve_test.go @@ -115,19 +115,19 @@ func TestInitEmptyLister(t *testing.T) { } } -func TestNodesMaybeOverReservedCount(t *testing.T) { +func TestGetNodesOverReservationStatusCount(t *testing.T) { fakeClient, err := tu.NewFakeClient() if err != nil { t.Fatal(err) } - fakePodLister := &fakePodLister{} nrtCache := mustOverReserve(t, fakeClient, fakePodLister) - dirtyNodes := nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(dirtyNodes) != 0 { t.Errorf("dirty nodes from pristine cache: %v", dirtyNodes) } + // TODO: validate cleanNodes } func TestDirtyNodesMarkDiscarded(t *testing.T) { @@ -149,7 +149,7 @@ func TestDirtyNodesMarkDiscarded(t *testing.T) { nrtCache.ReserveNodeResources(nodeName, &corev1.Pod{}) } - dirtyNodes := nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(dirtyNodes) != 0 { t.Errorf("dirty nodes from pristine cache: %v", dirtyNodes) } @@ -158,7 +158,7 @@ func TestDirtyNodesMarkDiscarded(t *testing.T) { nrtCache.NodeMaybeOverReserved(nodeName, &corev1.Pod{}) } - dirtyNodes = nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ = nrtCache.GetNodesOverReservationStatus("testing") sort.Strings(dirtyNodes) if !reflect.DeepEqual(dirtyNodes, expectedNodes) { @@ -185,7 +185,7 @@ func TestDirtyNodesUnmarkedOnReserve(t *testing.T) { nrtCache.ReserveNodeResources(nodeName, &corev1.Pod{}) } - dirtyNodes := nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(dirtyNodes) != 0 { t.Errorf("dirty nodes from pristine cache: %v", dirtyNodes) } @@ -201,7 +201,7 @@ func TestDirtyNodesUnmarkedOnReserve(t *testing.T) { "node-1", } - dirtyNodes = nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ = nrtCache.GetNodesOverReservationStatus("testing") if !reflect.DeepEqual(dirtyNodes, expectedNodes) { t.Errorf("got=%v expected=%v", dirtyNodes, expectedNodes) @@ -433,9 +433,12 @@ func TestFlush(t *testing.T) { }, } - nrtCache.FlushNodes(logID, expectedNodeTopology.DeepCopy()) + updatesMap := map[string]*topologyv1alpha2.NodeResourceTopology{ + expectedNodeTopology.Name: expectedNodeTopology.DeepCopy(), + } + nrtCache.FlushNodes(logID, updatesMap, nil) - dirtyNodes := nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(dirtyNodes) != 0 { t.Errorf("dirty nodes after flush: %v", dirtyNodes) } @@ -517,7 +520,7 @@ func TestResyncNoPodFingerprint(t *testing.T) { nrtCache.Resync() - dirtyNodes := nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(dirtyNodes) != 1 || dirtyNodes[0] != "node1" { t.Errorf("cleaned nodes after resyncing with bad data: %v", dirtyNodes) @@ -611,7 +614,7 @@ func TestResyncMatchFingerprint(t *testing.T) { nrtCache.Resync() - dirtyNodes := nrtCache.NodesMaybeOverReserved("testing") + dirtyNodes, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(dirtyNodes) > 0 { t.Errorf("node still dirty after resyncing with good data: %v", dirtyNodes) } @@ -641,7 +644,7 @@ func TestUnknownNodeWithForeignPods(t *testing.T) { nrtCache.NodeHasForeignPods("node-bogus", &corev1.Pod{}) - names := nrtCache.NodesMaybeOverReserved("testing") + names, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(names) != 0 { t.Errorf("non-existent node has foreign pods!") } @@ -714,7 +717,7 @@ func TestNodeWithForeignPods(t *testing.T) { target := "node2" nrtCache.NodeHasForeignPods(target, &corev1.Pod{}) - names := nrtCache.NodesMaybeOverReserved("testing") + names, _ := nrtCache.GetNodesOverReservationStatus("testing") if len(names) != 1 || names[0] != target { t.Errorf("unexpected dirty nodes: %v", names) } diff --git a/pkg/noderesourcetopology/cache/store.go b/pkg/noderesourcetopology/cache/store.go index 8ff78b321..350f69cf2 100644 --- a/pkg/noderesourcetopology/cache/store.go +++ b/pkg/noderesourcetopology/cache/store.go @@ -72,6 +72,32 @@ func (nrs *nrtStore) Update(nrt *topologyv1alpha2.NodeResourceTopology) { klog.V(5).InfoS("nrtcache: updated cached NodeTopology", "node", nrt.Name) } +func (nrs *nrtStore) UpdateIfNeeded(nrt *topologyv1alpha2.NodeResourceTopology, isNeeded func(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool) { + cur, ok := nrs.data[nrt.Name] + // if we do NOT have previous data, we surely need an update. + if ok && !isNeeded(cur, nrt) { + return + } + nrs.Update(nrt) +} + +func neverNeeded(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool { + return false +} + +func alwaysNeeded(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool { + return true +} + +// NodeNames return the names of all the NRTs present in the store with no ordering guarantee +func (nrs *nrtStore) NodeNames() []string { + nodeNames := make([]string, 0, len(nrs.data)) + for name := range nrs.data { + nodeNames = append(nodeNames, name) + } + return nodeNames +} + // resourceStore maps the resource requested by pod by pod namespaed name. It is not thread safe and needs to be protected by a lock. type resourceStore struct { // key: namespace + "/" name diff --git a/pkg/noderesourcetopology/cache/store_test.go b/pkg/noderesourcetopology/cache/store_test.go index 30871d450..a51f9d25a 100644 --- a/pkg/noderesourcetopology/cache/store_test.go +++ b/pkg/noderesourcetopology/cache/store_test.go @@ -23,7 +23,6 @@ import ( "sort" "testing" - topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,6 +30,9 @@ import ( podlisterv1 "k8s.io/client-go/listers/core/v1" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" + topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" + nrtv1alpha2attr "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2/helper/attribute" + "github.com/k8stopologyawareschedwg/podfingerprint" ) @@ -219,6 +221,290 @@ func TestNRTStoreGet(t *testing.T) { } } +func TestNRTStoreNodeNames(t *testing.T) { + tcases := []struct { + description string + nrts []topologyv1alpha2.NodeResourceTopology + expectedNames []string + }{ + { + description: "empty", + expectedNames: []string{}, + }, + { + description: "simple names", + nrts: []topologyv1alpha2.NodeResourceTopology{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + }, + expectedNames: []string{"node-0", "node-1"}, + }, + { + description: "longish random name list", + nrts: []topologyv1alpha2.NodeResourceTopology{ + {ObjectMeta: metav1.ObjectMeta{Name: "node-0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-9"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-8"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-2"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-3"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-4"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-5"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-7"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node-6"}}, + }, + expectedNames: []string{"node-0", "node-1", "node-2", "node-3", "node-4", "node-5", "node-6", "node-7", "node-8", "node-9"}, + }, + } + + for _, tcase := range tcases { + t.Run(tcase.description, func(t *testing.T) { + ns := newNrtStore(tcase.nrts) + names := ns.NodeNames() + sort.Strings(names) + if !reflect.DeepEqual(names, tcase.expectedNames) { + t.Fatalf("mismatching names: got %v expected %v", names, tcase.expectedNames) + } + }) + } +} + +func TestNRTStoreUpdateIfNeeded(t *testing.T) { + + tcases := []struct { + description string + nrts []topologyv1alpha2.NodeResourceTopology + candidate *topologyv1alpha2.NodeResourceTopology + isNeeded func(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool + expected *topologyv1alpha2.NodeResourceTopology + }{ + { + description: "never needed when empty", + nrts: []topologyv1alpha2.NodeResourceTopology{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + }, + candidate: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + isNeeded: neverNeeded, + expected: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + }, + { + description: "never needed", + nrts: []topologyv1alpha2.NodeResourceTopology{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + }, + candidate: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + isNeeded: neverNeeded, + expected: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + }, + }, + { + description: "always needed", + nrts: []topologyv1alpha2.NodeResourceTopology{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + }, + }, + candidate: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + isNeeded: alwaysNeeded, + expected: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + }, + { + description: "triggers guard", + nrts: []topologyv1alpha2.NodeResourceTopology{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "foo", + }, + }, + }, + }, + candidate: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + isNeeded: func(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool { + if oldNrt == nil { + return true + } + if newNrt == nil { + return false + } + val, ok := nrtv1alpha2attr.Get(oldNrt.Attributes, "foo") + if !ok { + return true + } + return val.Value == "foo" + }, + expected: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + }, + { + description: "failed guard", + nrts: []topologyv1alpha2.NodeResourceTopology{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + }, + candidate: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "zap", + }, + }, + }, + isNeeded: func(oldNrt, newNrt *topologyv1alpha2.NodeResourceTopology) bool { + if oldNrt == nil { + return true + } + if newNrt == nil { + return false + } + val, ok := nrtv1alpha2attr.Get(oldNrt.Attributes, "foo") + if !ok { + return true + } + return val.Value == "foo" + }, + expected: &topologyv1alpha2.NodeResourceTopology{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Attributes: []topologyv1alpha2.AttributeInfo{ + { + Name: "foo", + Value: "bar", + }, + }, + }, + }, + } + + for _, tcase := range tcases { + t.Run(tcase.description, func(t *testing.T) { + ns := newNrtStore(tcase.nrts) + ns.UpdateIfNeeded(tcase.candidate, tcase.isNeeded) + got := ns.GetNRTCopyByNodeName(tcase.candidate.Name) + if !reflect.DeepEqual(got, tcase.expected) { + t.Fatalf("updateIfNeeded inconsistent result got %v expected %v", got, tcase.expected) + } + }) + } +} + func TestNRTStoreUpdate(t *testing.T) { nrts := []topologyv1alpha2.NodeResourceTopology{ {