Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nrt: log: introduce and use "generation" for cache #798

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions pkg/noderesourcetopology/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,29 @@ import (
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
)

type CachedNRTInfo struct {
// Generation is akin to the object resourceVersion and represents
// the observed state in the cache. It's an opaque monotonically increasing number which can only compared for equality
// and which is only increased in the resync loop. It is used to cross correlate resync attempts with observed state
// with cache content. Used only in logging. If the cache implementation has no concept of caching nor generation,
// it should always return 0 (zero).
Generation uint64

// Fresh signals the caller if the NRT data is fresh.
// If true, the data is fresh and ready to be consumed.
// If false, the data is stale and the caller need to wait for a future refresh.
Fresh bool
}

type Interface interface {
// GetCachedNRTCopy retrieves a NRT copy from cache, and then deducts over-reserved resources if necessary.
// It will be used as the source of truth across the Pod's scheduling cycle.
// Over-reserved resources are the resources consumed by pods scheduled to that node after the last update
// of NRT pertaining to the same node, pessimistically overallocated on ALL the NUMA zones of the node.
// The pod argument is used only for logging purposes.
// Returns nil if there is no NRT data available for the node named `nodeName`.
// Returns a boolean to signal the caller if the NRT data is fresh.
// If true, the data is fresh and ready to be consumed.
// If false, the data is stale and the caller need to wait for a future refresh.
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool)
// Returns a CachedNRTInfo describing the NRT data returned. Meaningful only if `nrt` != nil.
GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo)

// NodeMaybeOverReserved declares a node was filtered out for not enough resources available.
// This means this node is eligible for a resync. When a node is marked discarded (dirty), it matters not
Expand Down
6 changes: 3 additions & 3 deletions pkg/noderesourcetopology/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func checkGetCachedNRTCopy(t *testing.T, makeCache func(client ctrlclient.WithWa
nrtCache.NodeHasForeignPods(tc.nodeName, pod)
}

gotNRT, gotOK := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod)
gotNRT, gotInfo := nrtCache.GetCachedNRTCopy(ctx, tc.nodeName, pod)

if gotOK != tc.expectedOK {
t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotOK, tc.expectedOK)
if gotInfo.Fresh != tc.expectedOK {
t.Fatalf("unexpected object status from cache: got: %v expected: %v", gotInfo.Fresh, tc.expectedOK)
}
if gotNRT != nil && tc.expectedNRT == nil {
t.Fatalf("object from cache not nil but expected nil")
Expand Down
9 changes: 5 additions & 4 deletions pkg/noderesourcetopology/cache/discardreserved.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,21 @@ func NewDiscardReserved(lh logr.Logger, client ctrlclient.Client) Interface {
}
}

func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (pt *DiscardReserved) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
pt.rMutex.RLock()
defer pt.rMutex.RUnlock()
if t, ok := pt.reservationMap[nodeName]; ok {
if len(t) > 0 {
return nil, false
return nil, CachedNRTInfo{}
}
}

info := CachedNRTInfo{Fresh: true}
nrt := &topologyv1alpha2.NodeResourceTopology{}
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
return nil, true
return nil, info
}
return nrt, true
return nrt, info
}

func (pt *DiscardReserved) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {}
Expand Down
4 changes: 2 additions & 2 deletions pkg/noderesourcetopology/cache/discardreserved_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestDiscardReservedNodesGetNRTCopyFails(t *testing.T) {
},
}

nrtObj, ok := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{})
if ok {
nrtObj, nrtInfo := nrtCache.GetCachedNRTCopy(context.Background(), "node1", &corev1.Pod{})
if nrtInfo.Fresh {
t.Fatal("expected false\ngot true\n")
}
if nrtObj != nil {
Expand Down
36 changes: 29 additions & 7 deletions pkg/noderesourcetopology/cache/overreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type OverReserve struct {
lh logr.Logger
client ctrlclient.Reader
lock sync.Mutex
generation uint64
nrts *nrtStore
assumedResources map[string]*resourceStore // nodeName -> resourceStore
// nodesMaybeOverreserved counts how many times a node is filtered out. This is used as trigger condition to try
Expand Down Expand Up @@ -97,30 +98,33 @@ func NewOverReserve(ctx context.Context, lh logr.Logger, cfg *apiconfig.NodeReso
return obj, nil
}

func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, pod *corev1.Pod) (nrt *topologyv1alpha2.NodeResourceTopology, info CachedNRTInfo) {

and then just change all return to return nrt, info ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of the named returns but for silly reasons, but this case seems interesting I'll check. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, tried out. Looks very nice in the overreserved impl, but doesn't look so great in the discardreserved and passthrough impl, IMO leading to a slightly more convoluted code than we have now. I value the fact implementation across the implementations is as consistent as could be, so I think overall I'd like more the current approach and not using the named parameters just yet.

ov.lock.Lock()
defer ov.lock.Unlock()
if ov.nodesWithForeignPods.IsSet(nodeName) {
return nil, false
return nil, CachedNRTInfo{}
}

info := CachedNRTInfo{Fresh: true}
nrt := ov.nrts.GetNRTCopyByNodeName(nodeName)
if nrt == nil {
return nil, true
return nil, info
}

info.Generation = ov.generation
nodeAssumedResources, ok := ov.assumedResources[nodeName]
if !ok {
return nrt, true
return nrt, info
}

logID := klog.KObj(pod)
lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName, logging.KeyGeneration, ov.generation)

lh.V(6).Info("NRT", "fromcache", stringify.NodeResourceTopologyResources(nrt))
nodeAssumedResources.UpdateNRT(nrt, logging.KeyPod, logID)

lh.V(5).Info("NRT", "withassumed", stringify.NodeResourceTopologyResources(nrt))
return nrt, true
return nrt, info
}

func (ov *OverReserve) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {
Expand Down Expand Up @@ -176,6 +180,7 @@ func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod)
}

type DesyncedNodes struct {
Generation uint64
MaybeOverReserved []string
ConfigChanged []string
}
Expand Down Expand Up @@ -207,6 +212,10 @@ func (rn DesyncedNodes) DirtyCount() int {
func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
ov.lock.Lock()
defer ov.lock.Unlock()

// make sure to log the generation to be able to crosscorrelate with later logs
lh = lh.WithValues(logging.KeyGeneration, ov.generation)

// this is intentionally aggressive. We don't yet make any attempt to find out if the
// node was discarded because pessimistically overrserved (which should indeed trigger
// a resync) or if it was discarded because the actual resources on the node really were
Expand All @@ -229,6 +238,7 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
lh.V(4).Info("found dirty nodes", "foreign", foreignCount, "discarded", overreservedCount, "configChange", configChangeCount, "total", nodes.Len())
}
return DesyncedNodes{
Generation: ov.generation,
MaybeOverReserved: nodes.Keys(),
ConfigChanged: configChangeNodes.Keys(),
}
Expand All @@ -244,11 +254,14 @@ func (ov *OverReserve) GetDesyncedNodes(lh logr.Logger) DesyncedNodes {
// too aggressive resync attempts, so to more, likely unnecessary, computation work on the scheduler side.
func (ov *OverReserve) Resync() {
// we are not working with a specific pod, so we need a unique key to track this flow
lh_ := ov.lh.WithName(logging.FlowCacheSync).WithValues(logging.KeyLogID, logging.TimeLogID())
lh_ := ov.lh.WithName(logging.FlowCacheSync)
lh_.V(4).Info(logging.FlowBegin)
defer lh_.V(4).Info(logging.FlowEnd)

nodes := ov.GetDesyncedNodes(lh_)
// we start without because chicken/egg problem. This is the earliest we can use the generation value.
lh_ = lh_.WithValues(logging.KeyGeneration, nodes.Generation)

// avoid as much as we can unnecessary work and logs.
if nodes.Len() == 0 {
lh_.V(5).Info("no dirty nodes detected")
Expand Down Expand Up @@ -331,6 +344,7 @@ func (ov *OverReserve) Resync() {
func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.NodeResourceTopology) {
ov.lock.Lock()
defer ov.lock.Unlock()

for _, nrt := range nrts {
lh.V(2).Info("flushing", logging.KeyNode, nrt.Name)
ov.nrts.Update(nrt)
Expand All @@ -339,6 +353,14 @@ func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.Node
ov.nodesWithForeignPods.Delete(nrt.Name)
ov.nodesWithAttrUpdate.Delete(nrt.Name)
}

if len(nrts) == 0 {
return
}

// increase only if we mutated the internal state
ov.generation += 1
lh.V(2).Info("generation", "new", ov.generation)
}

// to be used only in tests
Expand Down
4 changes: 2 additions & 2 deletions pkg/noderesourcetopology/cache/overreserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ func TestNodeWithForeignPods(t *testing.T) {
t.Errorf("unexpected dirty nodes: %v", nodes.MaybeOverReserved)
}

_, ok := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{})
if ok {
_, info := nrtCache.GetCachedNRTCopy(context.Background(), target, &corev1.Pod{})
if info.Fresh {
t.Errorf("succesfully got node with foreign pods!")
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/noderesourcetopology/cache/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ func NewPassthrough(lh logr.Logger, client ctrlclient.Client) Interface {
}
}

func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, bool) {
func (pt Passthrough) GetCachedNRTCopy(ctx context.Context, nodeName string, _ *corev1.Pod) (*topologyv1alpha2.NodeResourceTopology, CachedNRTInfo) {
pt.lh.V(5).Info("lister for NRT plugin")
info := CachedNRTInfo{Fresh: true}
nrt := &topologyv1alpha2.NodeResourceTopology{}
if err := pt.client.Get(ctx, types.NamespacedName{Name: nodeName}, nrt); err != nil {
pt.lh.V(5).Error(err, "cannot get nrts from lister")
return nil, true
return nil, info
}
return nrt, true
return nrt, info
}

func (pt Passthrough) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {}
Expand Down
5 changes: 3 additions & 2 deletions pkg/noderesourcetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
lh.V(4).Info(logging.FlowBegin)
defer lh.V(4).Info(logging.FlowEnd)

nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
if !ok {
nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
lh = lh.WithValues(logging.KeyGeneration, info.Generation)
if !info.Fresh {
lh.V(2).Info("invalid topology data")
return framework.NewStatus(framework.Unschedulable, "invalid node topology data")
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/noderesourcetopology/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ limitations under the License.
package logging

import (
"fmt"
"reflect"
"time"

corev1 "k8s.io/api/core/v1"
)
Expand All @@ -33,6 +31,7 @@ const (
KeyFlow string = "flow"
KeyContainer string = "container"
KeyContainerKind string = "kind"
KeyGeneration string = "generation"
)

const (
Expand Down Expand Up @@ -63,7 +62,3 @@ func PodUID(pod *corev1.Pod) string {
}
return string(pod.GetUID())
}

func TimeLogID() string {
return fmt.Sprintf("uts/%v", time.Now().UnixMilli())
}
6 changes: 3 additions & 3 deletions pkg/noderesourcetopology/score.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func (tm *TopologyMatch) Score(ctx context.Context, state *framework.CycleState,
return framework.MaxNodeScore, nil
}

nodeTopology, ok := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)

if !ok {
nodeTopology, info := tm.nrtCache.GetCachedNRTCopy(ctx, nodeName, pod)
lh = lh.WithValues(logging.KeyGeneration, info.Generation)
if !info.Fresh {
lh.V(4).Info("noderesourcetopology is not valid for node")
return 0, nil
}
Expand Down