Skip to content

Commit

Permalink
Merge pull request #759 from h-w-chen/dev/dev/pap-advisor-metrics2
Browse files Browse the repository at this point in the history
metrics(sysadvisor) - power aware advisor emits metrics of essential ops
  • Loading branch information
gary-lgy authored Jan 13, 2025
2 parents 31410da + c774d9a commit f3d81a4
Show file tree
Hide file tree
Showing 23 changed files with 442 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPowerAction_String(t *testing.T) {
op: spec.InternalOpFreqCap,
arg: 255,
},
want: "op: FreqCap, arg: 255",
want: "op: cap, arg: 255",
},
}
for _, tt := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type dvfsTracker struct {
dvfsAccumEffect int
inDVFS bool
prevPower int

capperProber CapperProber
}

func (d *dvfsTracker) getDVFSAllowPercent() int {
Expand All @@ -35,9 +37,13 @@ func (d *dvfsTracker) getDVFSAllowPercent() int {
return leftPercentage
}

func (d *dvfsTracker) isCapperAvailable() bool {
return d.capperProber != nil && d.capperProber.IsCapperReady()
}

func (d *dvfsTracker) update(actualWatt, desiredWatt int) {
// only accumulate when dvfs is engaged
if d.prevPower >= 0 && d.inDVFS {
if d.prevPower >= 0 && d.inDVFS && d.isCapperAvailable() {
// if actual power is more than previous, likely previous round dvfs took no effect; not to take into account
if actualWatt < d.prevPower {
dvfsEffect := (d.prevPower - actualWatt) * 100 / d.prevPower
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type mockCapperProber struct {
mock.Mock
}

func (m *mockCapperProber) IsCapperReady() bool {
args := m.Called()
return args.Bool(0)
}

func Test_dvfsTracker_update(t *testing.T) {
t.Parallel()

mockProber := new(mockCapperProber)
mockProber.On("IsCapperReady").Return(true)

type fields struct {
dvfsUsed int
indvfs bool
Expand Down Expand Up @@ -51,6 +65,7 @@ func Test_dvfsTracker_update(t *testing.T) {
desiredWatt: 85,
},
wantDVFSTracker: dvfsTracker{
capperProber: mockProber,
dvfsAccumEffect: 3,
inDVFS: false,
prevPower: 90,
Expand All @@ -68,6 +83,7 @@ func Test_dvfsTracker_update(t *testing.T) {
desiredWatt: 85,
},
wantDVFSTracker: dvfsTracker{
capperProber: mockProber,
dvfsAccumEffect: 13,
inDVFS: true,
prevPower: 90,
Expand All @@ -85,6 +101,7 @@ func Test_dvfsTracker_update(t *testing.T) {
desiredWatt: 85,
},
wantDVFSTracker: dvfsTracker{
capperProber: mockProber,
dvfsAccumEffect: 3,
inDVFS: true,
prevPower: 101,
Expand All @@ -96,6 +113,7 @@ func Test_dvfsTracker_update(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
d := &dvfsTracker{
capperProber: mockProber,
dvfsAccumEffect: tt.fields.dvfsUsed,
inDVFS: tt.fields.indvfs,
prevPower: tt.fields.prevPower,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,38 @@ import (
"github.com/pkg/errors"

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/advisor/action"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/capper"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/spec"
"github.com/kubewharf/katalyst-core/pkg/consts"
metrictypes "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

// threshold of cpu usage that allows voluntary dvfs
const voluntaryDVFSCPUUsageThreshold = 0.45
const (
voluntaryDVFSCPUUsageThreshold = 0.45

metricPowerAwareDVFSEffect = "node_power_accu_dvfs_effect"
)

type EvictableProber interface {
HasEvictablePods() bool
}

// CapperProber is only applicable to advisor; capper actor(client) won't be required to implement
type CapperProber interface {
IsCapperReady() bool
}

// evictFirstStrategy always attempts to evict low priority pods if any; only after all are exhausted will it resort to DVFS means.
// besides, it will continue to try the best to meet the alert spec, regardless of the alert update time.
// alert level has the following meanings in this strategy:
// P1 - eviction only;
// P0 - evict if applicable; otherwise conduct DVFS once if needed (DVFS is limited to 10%);
// S0 - DVFS in urgency (no limit on DVFS)
type evictFirstStrategy struct {
emitter metrics.MetricEmitter
coefficient exponentialDecay
evictableProber EvictableProber
dvfsTracker dvfsTracker
Expand Down Expand Up @@ -135,6 +147,7 @@ func (e *evictFirstStrategy) yieldActionPlan(op, internalOp spec.InternalOp, act

func (e *evictFirstStrategy) RecommendAction(actualWatt int, desiredWatt int, alert spec.PowerAlert, internalOp spec.InternalOp, ttl time.Duration) action.PowerAction {
e.dvfsTracker.update(actualWatt, desiredWatt)
e.emitDVFSAccumulatedEffect(e.dvfsTracker.dvfsAccumEffect)
general.InfofV(6, "pap: dvfs effect: %d", e.dvfsTracker.dvfsAccumEffect)

if actualWatt <= desiredWatt {
Expand All @@ -152,12 +165,21 @@ func (e *evictFirstStrategy) RecommendAction(actualWatt int, desiredWatt int, al
return actionPlan
}

func NewEvictFirstStrategy(prober EvictableProber, metricsReader metrictypes.MetricsReader) PowerActionStrategy {
func (e *evictFirstStrategy) emitDVFSAccumulatedEffect(percentage int) {
_ = e.emitter.StoreInt64(metricPowerAwareDVFSEffect, int64(percentage), metrics.MetricTypeNameRaw)
}

func NewEvictFirstStrategy(emitter metrics.MetricEmitter, prober EvictableProber, metricsReader metrictypes.MetricsReader, capper capper.PowerCapper) PowerActionStrategy {
general.Infof("pap: using EvictFirst strategy")
capperProber, _ := capper.(CapperProber)
return &evictFirstStrategy{
emitter: emitter,
coefficient: exponentialDecay{b: defaultDecayB},
evictableProber: prober,
dvfsTracker: dvfsTracker{dvfsAccumEffect: 0},
metricsReader: metricsReader,
dvfsTracker: dvfsTracker{
dvfsAccumEffect: 0,
capperProber: capperProber,
},
metricsReader: metricsReader,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/advisor/action"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/spec"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)

type mockEvicableProber struct {
Expand Down Expand Up @@ -174,9 +175,11 @@ func Test_evictFirstStrategy_RecommendAction(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
e := &evictFirstStrategy{
emitter: &metrics.DummyMetrics{},
coefficient: tt.fields.coefficient,
evictableProber: tt.fields.evictableProber,
dvfsTracker: dvfsTracker{dvfsAccumEffect: tt.fields.dvfsUsed},
metricsReader: nil,
}
if got := e.RecommendAction(tt.args.actualWatt, tt.args.desiredWatt, tt.args.alert, tt.args.internalOp, tt.args.ttl); !reflect.DeepEqual(got, tt.want) {
t.Errorf("RecommendAction() = %v, want %v", got, tt.want)
Expand Down
25 changes: 15 additions & 10 deletions pkg/agent/sysadvisor/plugin/poweraware/advisor/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/capper"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/evictor"
powermetric "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/metric"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/reader"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/spec"
"github.com/kubewharf/katalyst-core/pkg/config/generic"
Expand All @@ -40,12 +41,6 @@ const (
// 9 seconds between actions since RAPL/HSMP capping needs 4-6 seconds to stabilize itself
// and malachite realtime metric server imposes delay of up to 2 seconds
intervalSpecFetch = time.Second * 9

metricPowerAwareCurrentPowerInWatt = "power_current_watt"
metricPowerAwareDesiredPowerInWatt = "power_desired_watt"
metricPowerAwareActionPlan = "power_action_plan"
metricTagNameActionPlanOp = "op"
metricTagNameActionPlanMode = "mode"
)

// PowerAwareAdvisor is the interface that runs the whole power advisory process
Expand Down Expand Up @@ -74,20 +69,25 @@ func (p *powerAwareAdvisor) Init() error {
return errors.New("no power reader is provided")
}
if err := p.powerReader.Init(); err != nil {
p.emitErrorCode(powermetric.ErrorCodeInitFailure)
return errors.Wrap(err, "failed to initialize power reader")
}

if p.podEvictor == nil {
p.emitErrorCode(powermetric.ErrorCodeInitFailure)
return errors.New("no pod eviction server is provided")
}
if err := p.podEvictor.Init(); err != nil {
p.emitErrorCode(powermetric.ErrorCodeInitFailure)
return errors.Wrap(err, "failed to initialize evict service")
}

if p.powerCapper == nil {
p.emitErrorCode(powermetric.ErrorCodeInitFailure)
return errors.New("no power capping server is provided")
}
if err := p.powerCapper.Init(); err != nil {
p.emitErrorCode(powermetric.ErrorCodeInitFailure)
return errors.Wrap(err, "failed to initialize power capping server")
}

Expand All @@ -97,10 +97,12 @@ func (p *powerAwareAdvisor) Init() error {
func (p *powerAwareAdvisor) Run(ctx context.Context) {
general.Infof("pap: advisor Run started")
if err := p.podEvictor.Start(); err != nil {
p.emitErrorCode(powermetric.ErrorCodeStartFailure)
general.Errorf("pap: failed to start pod evict service: %v", err)
return
}
if err := p.powerCapper.Start(); err != nil {
p.emitErrorCode(powermetric.ErrorCodeStartFailure)
general.Errorf("pap: failed to start power capping service: %v", err)
return
}
Expand All @@ -126,6 +128,7 @@ func (p *powerAwareAdvisor) cleanup() {
func (p *powerAwareAdvisor) run(ctx context.Context) {
powerSpec, err := p.specFetcher.GetPowerSpec(ctx)
if err != nil {
p.emitErrorCode(powermetric.ErrorCodePowerSpecFormat)
klog.Errorf("pap: getting power spec failed: %#v", err)
return
}
Expand All @@ -150,20 +153,21 @@ func (p *powerAwareAdvisor) run(ctx context.Context) {

currentWatts, err := p.powerReader.Get(ctx)
if err != nil {
p.emitErrorCode(powermetric.ErrorCodePowerGetCurrentUsage)
klog.Errorf("pap: reading power failed: %#v", err)
return
}

klog.V(6).Infof("pap: current power usage: %d watts", currentWatts)

// report metrics: current power reading, desired power value
_ = p.emitter.StoreInt64(metricPowerAwareCurrentPowerInWatt, int64(currentWatts), metrics.MetricTypeNameRaw)
_ = p.emitter.StoreInt64(metricPowerAwareDesiredPowerInWatt, int64(powerSpec.Budget), metrics.MetricTypeNameRaw)
p.emitCurrentPowerUSage(currentWatts)
p.emitPowerSpec(powerSpec)

freqCapped, err := p.reconciler.Reconcile(ctx, powerSpec, currentWatts)
if err != nil {
p.emitErrorCode(powermetric.ErrorCodeRecoverable)
general.Errorf("pap: reconcile error: %v", err)
// todo: report to metric dashboard
return
}

Expand All @@ -183,13 +187,14 @@ func NewAdvisor(dryRun bool,
capper capper.PowerCapper,
metricsReader metrictypes.MetricsReader,
) PowerAwareAdvisor {
percentageEvictor := evictor.NewPowerLoadEvict(qosConfig, emitter, podFetcher, podEvictor)
return &powerAwareAdvisor{
emitter: emitter,
specFetcher: spec.NewFetcher(nodeFetcher, annotationKeyPrefix),
powerReader: reader,
podEvictor: podEvictor,
powerCapper: capper,
reconciler: newReconciler(dryRun, metricsReader, emitter, evictor.NewPowerLoadEvict(qosConfig, podFetcher, podEvictor), capper),
reconciler: newReconciler(dryRun, metricsReader, emitter, percentageEvictor, capper),
inFreqCap: false,
}
}
34 changes: 34 additions & 0 deletions pkg/agent/sysadvisor/plugin/poweraware/advisor/advisor_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package advisor

import (
powermetric "github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/metric"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/spec"
)

func (p *powerAwareAdvisor) emitCurrentPowerUSage(currentWatts int) {
powermetric.EmitCurrentPowerUSage(p.emitter, currentWatts)
}

func (p *powerAwareAdvisor) emitPowerSpec(powerSpec *spec.PowerSpec) {
powermetric.EmitPowerSpec(p.emitter, powerSpec)
}

func (p *powerAwareAdvisor) emitErrorCode(errorCause powermetric.ErrorCause) {
powermetric.EmitErrorCode(p.emitter, errorCause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/evictor"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/reader"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/poweraware/spec"
"github.com/kubewharf/katalyst-core/pkg/metrics"
metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool"
)

Expand Down Expand Up @@ -168,6 +169,7 @@ func Test_powerAwareAdvisor_run_abort_on_spec_fetcher_error(t *testing.T) {
depPowerReader := &dummyPowerReader{}

advisor := powerAwareAdvisor{
emitter: &metrics.DummyMetrics{},
specFetcher: &depSpecFetcher,
powerReader: depPowerReader,
}
Expand Down Expand Up @@ -250,6 +252,7 @@ func Test_powerAwareAdvisor_Run_does_Init_Cleanup(t *testing.T) {
depPodEvictor := &dummyPodEvictor{}

advisor := powerAwareAdvisor{
emitter: &metrics.DummyMetrics{},
powerReader: depPowerReader,
podEvictor: depPodEvictor,
}
Expand Down
16 changes: 3 additions & 13 deletions pkg/agent/sysadvisor/plugin/poweraware/advisor/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@ func (p *powerReconciler) OnDVFSReset() {
p.strategy.OnDVFSReset()
}

func (p *powerReconciler) emitOpCode(action action.PowerAction, mode string) {
// report metrics of action op code with tag of dryRun
op := action.Op.String()
_ = p.emitter.StoreInt64(metricPowerAwareActionPlan, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
metricTagNameActionPlanOp: op,
metricTagNameActionPlanMode: mode,
})...)
}

func (p *powerReconciler) Reconcile(ctx context.Context, desired *spec.PowerSpec, actual int) (bool, error) {
alertTimeLimit, err := spec.GetPowerAlertResponseTimeLimit(desired.Alert)
if err != nil {
Expand All @@ -78,13 +68,13 @@ func (p *powerReconciler) Reconcile(ctx context.Context, desired *spec.PowerSpec
return false, nil
}
general.Infof("pap: dryRun: %s", actionPlan)
p.emitOpCode(actionPlan, "dryRun")
p.emitPowerAdvice(actionPlan, "dryRun")
p.priorAction = actionPlan
return false, nil
}

general.InfofV(6, "pap: reconcile action %#v", actionPlan)
p.emitOpCode(actionPlan, "real")
p.emitPowerAdvice(actionPlan, "real")

switch actionPlan.Op {
case spec.InternalOpFreqCap:
Expand All @@ -107,7 +97,7 @@ func newReconciler(dryRun bool, metricsReader metrictypes.MetricsReader, emitter
priorAction: action.PowerAction{},
evictor: evictor,
capper: capper,
strategy: strategy.NewEvictFirstStrategy(evictor, metricsReader),
strategy: strategy.NewEvictFirstStrategy(emitter, evictor, metricsReader, capper),
emitter: emitter,
}
}
Loading

0 comments on commit f3d81a4

Please sign in to comment.