Skip to content

Commit

Permalink
nrt: log: introduce and use "generation" for cache
Browse files Browse the repository at this point in the history
In order to improve the debuggability of the overreserve cache, we
would like to
1. correlate the cache state being used with
2. the actions the resync loop is doing
3. infer in a easier way the current state of the cache

This change aims to improve points 1 and 2, while also trying to make
3 easier in the future.

We introduce the concept of "generation" which is an opaque
monotonically increasing integer similar in spirit to the
`resourceVersion` kube API field.
Every time the internal state of the cache is updated, which happens
only in the resync loop by design, we increment the generation.

GetCachedNRTCopy will also return the generation of the data
being used, so we have now an uniform way to correlate readers
and writer of the cache, and we gain better visibility of the data
being used.

With verbose enough logging, using the generation is now easier
(albeit admittedly still clunky) to reconstruct the chain of changes
which lead to a given cache state, which was much harder previously.
Similarly, there's now a clear way to learn which cache state was
used to make a given scheduling decision, which was much harder before.

The changes involve mostly logging; to avoid proliferation of return
values, however, a trivial refactoring is done in `GetCachedNRTCopy`.
A beneficial side effect is much improved documentation of the
return values.

Signed-off-by: Francesco Romani <[email protected]>
  • Loading branch information
ffromani committed Sep 9, 2024
1 parent 0744b26 commit 0dae3ec
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 36 deletions.
20 changes: 16 additions & 4 deletions pkg/noderesourcetopology/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,29 @@ import (
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
)

type CachedNRTInfo struct {
// Generation is akin to the object resourceVersion and represents
// the observed state in the cache. It's an opaque monotonically increasing number which can only compared for equality
// and which is only increased in the resync loop. It is used to cross correlate resync attempts with observed state
// with cache content. Used only in logging. If the cache implementation has no concept of caching nor generation,
// it should always return 0 (zero).
Generation uint64

// Fresh signals the caller if the NRT data is fresh.
// If true, the data is fresh and ready to be consumed.
// If false, the data is stale and the caller need to wait for a future refresh.
Fresh bool
}

type Interface interface {
// GetCachedNRTCopy retrieves a NRT copy from cache, and then deducts over-reserved resources if necessary.
// It will be used as the source of truth across the Pod's scheduling cycle.
// Over-reserved resources are the resources consumed by pods scheduled to that node after the last update
// of NRT pertaining to the same node, pessimistically overallocated on ALL the NUMA zones of the node.
// The pod argument is used only for logging purposes.
// Returns nil if there is no NRT data available for the node named `nodeName`.
// Returns a boolean to signal the caller if the NRT data is fresh.
// If true, the data is fresh and ready to be consumed.
// If false, the data is stale and the caller need to wait for a future refresh.
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool)
// Returns a CachedNRTInfo describing the NRT data returned. Meaningful only if `nrt` != nil.
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo)

// NodeMaybeOverReserved declares a node was filtered out for not enough resources available.
// This means this node is eligible for a resync. When a node is marked discarded (dirty), it matters not
Expand Down
6 changes: 3 additions & 3 deletions pkg/noderesourcetopology/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func checkGetCachedNRTCopy(t *testing.T, makeCache func(client ctrlclient.WithWa
nrtCache.NodeHasForeignPods(tc.nodeName, pod)
}

gotNRT, gotOK := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod)
gotNRT, gotInfo := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod)

if gotOK != tc.expectedOK {
t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotOK, tc.expectedOK)
if gotInfo.Fresh != tc.expectedOK {
t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotInfo.Fresh, tc.expectedOK)
}
if gotNRT != nil && tc.expectedNRT == nil {
t.Fatalf("object from cache not nil but expected nil")
Expand Down
9 changes: 5 additions & 4 deletions pkg/noderesourcetopology/cache/discardreserved.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,21 @@ func NewDiscardReserved(lh logr.Logger, client ctrlclient.Client) Interface {
}
}

func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
pt.rMutex.RLock()
defer pt.rMutex.RUnlock()
if t, ok := pt.reservationMap[nodeName]; ok {
if len(t) > 0 {
return nil, false
return nil, CachedNRTInfo{}
}
}

info := CachedNRTInfo{Fresh: true}
nrt := &topologyv1alpha2.NodeResourceTopology{}
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
return nil, true
return nil, info
}
return nrt, true
return nrt, info
}

func (pt *DiscardReserved) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {}
Expand Down
4 changes: 2 additions & 2 deletions pkg/noderesourcetopology/cache/discardreserved_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestDiscardReservedNodesGetNRTCopyFails(t *testing.T) {
},
}

nrtObj, ok := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{})
if ok {
nrtObj, nrtInfo := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{})
if nrtInfo.Fresh {
t.Fatal("expected false\ngot true\n")
}
if nrtObj != nil {
Expand Down
36 changes: 29 additions & 7 deletions pkg/noderesourcetopology/cache/overreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type OverReserve struct {
lh logr.Logger
client ctrlclient.Reader
lock sync.Mutex
generation uint64
nrts *nrtStore
assumedResources map[string]*resourceStore // nodeName -> resourceStore
// nodesMaybeOverreserved counts how many times a node is filtered out. This is used as trigger condition to try
Expand Down Expand Up @@ -97,30 +98,33 @@ func NewOverReserve(ctx context.Context, lh logr.Logger, cfg *apiconfig.NodeReso
return obj, nil
}

func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
ov.lock.Lock()
defer ov.lock.Unlock()
if ov.nodesWithForeignPods.IsSet(nodeName) {
return nil, false
return nil, CachedNRTInfo{}
}

info := CachedNRTInfo{Fresh: true}
nrt := ov.nrts.GetNRTCopyByNodeName(nodeName)
if nrt == nil {
return nil, true
return nil, info
}

info.Generation = ov.generation
nodeAssumedResources, ok := ov.assumedResources[nodeName]
if !ok {
return nrt, true
return nrt, info
}

logID := klog.KObj(pod)
lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName, logging.KeyGeneration, ov.generation)

lh.V(6).Info("NRT", "fromcache", stringify.NodeResourceTopologyResources(nrt))
nodeAssumedResources.UpdateNRT(nrt, logging.KeyPod, logID)

lh.V(5).Info("NRT", "withassumed", stringify.NodeResourceTopologyResources(nrt))
return nrt, true
return nrt, info
}

func (ov *OverReserve) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {
Expand Down Expand Up @@ -176,6 +180,7 @@ func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod)
}

type DesyncedNodes struct {
Generation uint64
MaybeOverReserved []string
ConfigChanged []string
}
Expand Down Expand Up @@ -207,6 +212,10 @@ func (rn DesyncedNodes) DirtyCount() int {
func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
ov.lock.Lock()
defer ov.lock.Unlock()

// make sure to log the generation to be able to crosscorrelate with later logs
lh = lh.WithValues(logging.KeyGeneration, ov.generation)

// this is intentionally aggressive. We don't yet make any attempt to find out if the
// node was discarded because pessimistically overrserved (which should indeed trigger
// a resync) or if it was discarded because the actual resources on the node really were
Expand All @@ -229,6 +238,7 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
lh.V(4).Info("found dirty nodes", "foreign", foreignCount, "discarded", overreservedCount, "configChange", configChangeCount, "total", nodes.Len())
}
return DesyncedNodes{
Generation: ov.generation,
MaybeOverReserved: nodes.Keys(),
ConfigChanged: configChangeNodes.Keys(),
}
Expand All @@ -244,11 +254,14 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
// too aggressive resync attempts, so to more, likely unnecessary, computation work on the scheduler side.
func (ov *OverReserve) Resync() {
// we are not working with a specific pod, so we need a unique key to track this flow
lh_ := ov.lh.WithName(logging.FlowCacheSync).WithValues(logging.KeyLogID, logging.TimeLogID())
lh_ := ov.lh.WithName(logging.FlowCacheSync)
lh_.V(4).Info(logging.FlowBegin)
defer lh_.V(4).Info(logging.FlowEnd)

nodes := ov.GetDesyncedNodes(lh_)
// we start without because chicken/egg problem. This is the earliest we can use the generation value.
lh_ = lh_.WithValues(logging.KeyGeneration, nodes.Generation)

// avoid as much as we can unnecessary work and logs.
if nodes.Len() == 0 {
lh_.V(5).Info("no dirty nodes detected")
Expand Down Expand Up @@ -331,6 +344,7 @@ func (ov *OverReserve) Resync() {
func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.NodeResourceTopology) {
ov.lock.Lock()
defer ov.lock.Unlock()

for _, nrt := range nrts {
lh.V(2).Info("flushing", logging.KeyNode, nrt.Name)
ov.nrts.Update(nrt)
Expand All @@ -339,6 +353,14 @@ func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.Node
ov.nodesWithForeignPods.Delete(nrt.Name)
ov.nodesWithAttrUpdate.Delete(nrt.Name)
}

if len(nrts) == 0 {
return
}

// increase only if we mutated the internal state
ov.generation += 1
lh.V(2).Info("generation", "new", ov.generation)
}

// to be used only in tests
Expand Down
4 changes: 2 additions & 2 deletions pkg/noderesourcetopology/cache/overreserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ func TestNodeWithForeignPods(t *testing.T) {
t.Errorf("unexpected dirty nodes: %v", nodes.MaybeOverReserved)
}

_, ok := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{})
if ok {
_, info := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{})
if info.Fresh {
t.Errorf("succesfully got node with foreign pods!")
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/noderesourcetopology/cache/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ func NewPassthrough(lh logr.Logger, client ctrlclient.Client) Interface {
}
}

func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
pt.lh.V(5).Info("lister for NRT plugin")
info := CachedNRTInfo{Fresh: true}
nrt := &topologyv1alpha2.NodeResourceTopology{}
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
pt.lh.V(5).Error(err, "cannot get nrts from lister")
return nil, true
return nil, info
}
return nrt, true
return nrt, info
}

func (pt Passthrough) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {}
Expand Down
5 changes: 3 additions & 2 deletions pkg/noderesourcetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
lh.V(4).Info(logging.FlowBegin)
defer lh.V(4).Info(logging.FlowEnd)

nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
if !ok {
nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
lh = lh.WithValues(logging.KeyGeneration, info.Generation)
if !info.Fresh {
lh.V(2).Info("invalid topology data")
return framework.NewStatus(framework.Unschedulable, "invalid node topology data")
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/noderesourcetopology/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ limitations under the License.
package logging

import (
"fmt"
"reflect"
"time"

corev1 "k8s.io/api/core/v1"
)
Expand All @@ -33,6 +31,7 @@ const (
KeyFlow string = "flow"
KeyContainer string = "container"
KeyContainerKind string = "kind"
KeyGeneration string = "generation"
)

const (
Expand Down Expand Up @@ -63,7 +62,3 @@ func PodUID(pod *corev1.Pod) string {
}
return string(pod.GetUID())
}

func TimeLogID() string {
return fmt.Sprintf("uts/%v", time.Now().UnixMilli())
}
6 changes: 3 additions & 3 deletions pkg/noderesourcetopology/score.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState,
return framework.MaxNodeScore, nil
}

nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)

if !ok {
nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
lh = lh.WithValues(logging.KeyGeneration, info.Generation)
if !info.Fresh {
lh.V(4).Info("noderesourcetopology is not valid for node")
return 0, nil
}
Expand Down

0 comments on commit 0dae3ec

Please sign in to comment.