diff --git a/pkg/noderesourcetopology/cache/cache.go b/pkg/noderesourcetopology/cache/cache.go index dcb176ff9..ad05338c8 100644 --- a/pkg/noderesourcetopology/cache/cache.go +++ b/pkg/noderesourcetopology/cache/cache.go @@ -24,6 +24,20 @@ import ( topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" ) +type CachedNRTInfo struct { + // Generation is akin to the object resourceVersion and represents + // the observed state in the cache. It's an opaque monotonically increasing number which can only compared for equality + // and which is only increased in the resync loop. It is used to cross correlate resync attempts with observed state + // with cache content. Used only in logging. If the cache implementation has no concept of caching nor generation, + // it should always return 0 (zero). + Generation uint64 + + // Fresh signals the caller if the NRT data is fresh. + // If true, the data is fresh and ready to be consumed. + // If false, the data is stale and the caller need to wait for a future refresh. + Fresh bool +} + type Interface interface { // GetCachedNRTCopy retrieves a NRT copy from cache, and then deducts over-reserved resources if necessary. // It will be used as the source of truth across the Pod's scheduling cycle. @@ -31,10 +45,8 @@ type Interface interface { // of NRT pertaining to the same node, pessimistically overallocated on ALL the NUMA zones of the node. // The pod argument is used only for logging purposes. // Returns nil if there is no NRT data available for the node named `nodeName`. - // Returns a boolean to signal the caller if the NRT data is fresh. - // If true, the data is fresh and ready to be consumed. - // If false, the data is stale and the caller need to wait for a future refresh. - GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) + // Returns a CachedNRTInfo describing the NRT data returned. Meaningful only if `nrt` != nil. + GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) // NodeMaybeOverReserved declares a node was filtered out for not enough resources available. // This means this node is eligible for a resync. When a node is marked discarded (dirty), it matters not diff --git a/pkg/noderesourcetopology/cache/cache_test.go b/pkg/noderesourcetopology/cache/cache_test.go index 90c8a4086..9df645a78 100644 --- a/pkg/noderesourcetopology/cache/cache_test.go +++ b/pkg/noderesourcetopology/cache/cache_test.go @@ -100,10 +100,10 @@ func checkGetCachedNRTCopy(t *testing.T, makeCache func(client ctrlclient.WithWa nrtCache.NodeHasForeignPods(tc.nodeName, pod) } - gotNRT, gotOK := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod) + gotNRT, gotInfo := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod) - if gotOK != tc.expectedOK { - t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotOK, tc.expectedOK) + if gotInfo.Fresh != tc.expectedOK { + t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotInfo.Fresh, tc.expectedOK) } if gotNRT != nil && tc.expectedNRT == nil { t.Fatalf("object from cache not nil but expected nil") diff --git a/pkg/noderesourcetopology/cache/discardreserved.go b/pkg/noderesourcetopology/cache/discardreserved.go index ff877f9ee..b59d8cb2f 100644 --- a/pkg/noderesourcetopology/cache/discardreserved.go +++ b/pkg/noderesourcetopology/cache/discardreserved.go @@ -58,20 +58,21 @@ func NewDiscardReserved(lh logr.Logger, client ctrlclient.Client) Interface { } } -func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) { +func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) { pt.rMutex.RLock() defer pt.rMutex.RUnlock() if t, ok := pt.reservationMap[nodeName]; ok { if len(t) > 0 { - return nil, false + return nil, CachedNRTInfo{} } } + info := CachedNRTInfo{Fresh: true} nrt := &topologyv1alpha2.NodeResourceTopology{} if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil { - return nil, true + return nil, info } - return nrt, true + return nrt, info } func (pt *DiscardReserved) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {} diff --git a/pkg/noderesourcetopology/cache/discardreserved_test.go b/pkg/noderesourcetopology/cache/discardreserved_test.go index 63c763dca..46e86772f 100644 --- a/pkg/noderesourcetopology/cache/discardreserved_test.go +++ b/pkg/noderesourcetopology/cache/discardreserved_test.go @@ -66,8 +66,8 @@ func TestDiscardReservedNodesGetNRTCopyFails(t *testing.T) { }, } - nrtObj, ok := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{}) - if ok { + nrtObj, nrtInfo := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{}) + if nrtInfo.Fresh { t.Fatal("expected false\ngot true\n") } if nrtObj != nil { diff --git a/pkg/noderesourcetopology/cache/overreserve.go b/pkg/noderesourcetopology/cache/overreserve.go index 505ba955d..9cd5008c1 100644 --- a/pkg/noderesourcetopology/cache/overreserve.go +++ b/pkg/noderesourcetopology/cache/overreserve.go @@ -45,6 +45,7 @@ type OverReserve struct { lh logr.Logger client ctrlclient.Reader lock sync.Mutex + generation uint64 nrts *nrtStore assumedResources map[string]*resourceStore // nodeName -> resourceStore // nodesMaybeOverreserved counts how many times a node is filtered out. This is used as trigger condition to try @@ -97,30 +98,33 @@ func NewOverReserve(ctx context.Context, lh logr.Logger, cfg *apiconfig.NodeReso return obj, nil } -func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) { +func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) { ov.lock.Lock() defer ov.lock.Unlock() if ov.nodesWithForeignPods.IsSet(nodeName) { - return nil, false + return nil, CachedNRTInfo{} } + info := CachedNRTInfo{Fresh: true} nrt := ov.nrts.GetNRTCopyByNodeName(nodeName) if nrt == nil { - return nil, true + return nil, info } + + info.Generation = ov.generation nodeAssumedResources, ok := ov.assumedResources[nodeName] if !ok { - return nrt, true + return nrt, info } logID := klog.KObj(pod) - lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName) + lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName, logging.KeyGeneration, ov.generation) lh.V(6).Info("NRT", "fromcache", stringify.NodeResourceTopologyResources(nrt)) nodeAssumedResources.UpdateNRT(nrt, logging.KeyPod, logID) lh.V(5).Info("NRT", "withassumed", stringify.NodeResourceTopologyResources(nrt)) - return nrt, true + return nrt, info } func (ov *OverReserve) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) { @@ -176,6 +180,7 @@ func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod) } type DesyncedNodes struct { + Generation uint64 MaybeOverReserved []string ConfigChanged []string } @@ -207,6 +212,10 @@ func (rn DesyncedNodes) DirtyCount() int { func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes { ov.lock.Lock() defer ov.lock.Unlock() + + // make sure to log the generation to be able to crosscorrelate with later logs + lh = lh.WithValues(logging.KeyGeneration, ov.generation) + // this is intentionally aggressive. We don't yet make any attempt to find out if the // node was discarded because pessimistically overrserved (which should indeed trigger // a resync) or if it was discarded because the actual resources on the node really were @@ -229,6 +238,7 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes { lh.V(4).Info("found dirty nodes", "foreign", foreignCount, "discarded", overreservedCount, "configChange", configChangeCount, "total", nodes.Len()) } return DesyncedNodes{ + Generation: ov.generation, MaybeOverReserved: nodes.Keys(), ConfigChanged: configChangeNodes.Keys(), } @@ -244,11 +254,14 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes { // too aggressive resync attempts, so to more, likely unnecessary, computation work on the scheduler side. func (ov *OverReserve) Resync() { // we are not working with a specific pod, so we need a unique key to track this flow - lh_ := ov.lh.WithName(logging.FlowCacheSync).WithValues(logging.KeyLogID, logging.TimeLogID()) + lh_ := ov.lh.WithName(logging.FlowCacheSync) lh_.V(4).Info(logging.FlowBegin) defer lh_.V(4).Info(logging.FlowEnd) nodes := ov.GetDesyncedNodes(lh_) + // we start without because chicken/egg problem. This is the earliest we can use the generation value. + lh_ = lh_.WithValues(logging.KeyGeneration, nodes.Generation) + // avoid as much as we can unnecessary work and logs. if nodes.Len() == 0 { lh_.V(5).Info("no dirty nodes detected") @@ -331,6 +344,7 @@ func (ov *OverReserve) Resync() { func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.NodeResourceTopology) { ov.lock.Lock() defer ov.lock.Unlock() + for _, nrt := range nrts { lh.V(2).Info("flushing", logging.KeyNode, nrt.Name) ov.nrts.Update(nrt) @@ -339,6 +353,14 @@ func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.Node ov.nodesWithForeignPods.Delete(nrt.Name) ov.nodesWithAttrUpdate.Delete(nrt.Name) } + + if len(nrts) == 0 { + return + } + + // increase only if we mutated the internal state + ov.generation += 1 + lh.V(2).Info("generation", "new", ov.generation) } // to be used only in tests diff --git a/pkg/noderesourcetopology/cache/overreserve_test.go b/pkg/noderesourcetopology/cache/overreserve_test.go index 0d02ad95d..33214b444 100644 --- a/pkg/noderesourcetopology/cache/overreserve_test.go +++ b/pkg/noderesourcetopology/cache/overreserve_test.go @@ -720,8 +720,8 @@ func TestNodeWithForeignPods(t *testing.T) { t.Errorf("unexpected dirty nodes: %v", nodes.MaybeOverReserved) } - _, ok := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{}) - if ok { + _, info := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{}) + if info.Fresh { t.Errorf("succesfully got node with foreign pods!") } } diff --git a/pkg/noderesourcetopology/cache/passthrough.go b/pkg/noderesourcetopology/cache/passthrough.go index ac2107613..29f1e2a46 100644 --- a/pkg/noderesourcetopology/cache/passthrough.go +++ b/pkg/noderesourcetopology/cache/passthrough.go @@ -40,14 +40,15 @@ func NewPassthrough(lh logr.Logger, client ctrlclient.Client) Interface { } } -func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) { +func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) { pt.lh.V(5).Info("lister for NRT plugin") + info := CachedNRTInfo{Fresh: true} nrt := &topologyv1alpha2.NodeResourceTopology{} if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil { pt.lh.V(5).Error(err, "cannot get nrts from lister") - return nil, true + return nil, info } - return nrt, true + return nrt, info } func (pt Passthrough) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {} diff --git a/pkg/noderesourcetopology/filter.go b/pkg/noderesourcetopology/filter.go index 58ba44899..f627d0639 100644 --- a/pkg/noderesourcetopology/filter.go +++ b/pkg/noderesourcetopology/filter.go @@ -196,8 +196,9 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle lh.V(4).Info(logging.FlowBegin) defer lh.V(4).Info(logging.FlowEnd) - nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod) - if !ok { + nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod) + lh = lh.WithValues(logging.KeyGeneration, info.Generation) + if !info.Fresh { lh.V(2).Info("invalid topology data") return framework.NewStatus(framework.Unschedulable, "invalid node topology data") } diff --git a/pkg/noderesourcetopology/logging/logging.go b/pkg/noderesourcetopology/logging/logging.go index 14d3ee2b1..33583facf 100644 --- a/pkg/noderesourcetopology/logging/logging.go +++ b/pkg/noderesourcetopology/logging/logging.go @@ -17,9 +17,7 @@ limitations under the License. package logging import ( - "fmt" "reflect" - "time" corev1 "k8s.io/api/core/v1" ) @@ -33,6 +31,7 @@ const ( KeyFlow string = "flow" KeyContainer string = "container" KeyContainerKind string = "kind" + KeyGeneration string = "gen" ) const ( @@ -63,7 +62,3 @@ func PodUID(pod *corev1.Pod) string { } return string(pod.GetUID()) } - -func TimeLogID() string { - return fmt.Sprintf("uts/%v", time.Now().UnixMilli()) -} diff --git a/pkg/noderesourcetopology/score.go b/pkg/noderesourcetopology/score.go index 88a92c414..e82c9a76f 100644 --- a/pkg/noderesourcetopology/score.go +++ b/pkg/noderesourcetopology/score.go @@ -73,9 +73,9 @@ func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState, return framework.MaxNodeScore, nil } - nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod) - - if !ok { + nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod) + lh = lh.WithValues(logging.KeyGeneration, info.Generation) + if !info.Fresh { lh.V(4).Info("noderesourcetopology is not valid for node") return 0, nil }