Skip to content

Commit

Permalink
perf: RestPerf collectors should only run PollInstance for workloads
Browse files Browse the repository at this point in the history
Fixes: #2421
  • Loading branch information
cgrinds committed Oct 14, 2024
1 parent dcc5d66 commit 99ae8bf
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 18 deletions.
63 changes: 51 additions & 12 deletions cmd/collectors/restperf/restperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/netapp/harvest/v2/pkg/tree/node"
"github.com/netapp/harvest/v2/pkg/util"
"github.com/tidwall/gjson"
"iter"
"log/slog"
"maps"
"path"
Expand Down Expand Up @@ -65,9 +66,10 @@ var qosDetailQueries = map[string]string{
}

type RestPerf struct {
*rest2.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
perfProp *perfProp
archivedMetrics map[string]*rest2.Metric // Keeps metric definitions that are not found in the counter schema. These metrics may be available in future ONTAP versions.
*rest2.Rest // provides: AbstractCollector, Client, Object, Query, TemplateFn, TemplateType
perfProp *perfProp
archivedMetrics map[string]*rest2.Metric // Keeps metric definitions that are not found in the counter schema. These metrics may be available in future ONTAP versions.
hasInstanceSchedule bool
}

type counter struct {
Expand Down Expand Up @@ -142,6 +144,8 @@ func (r *RestPerf) Init(a *collector.AbstractCollector) error {
return err
}

r.InitSchedule()

r.Logger.Debug(
"initialized cache",
slog.Int("numMetrics", len(r.Prop.Metrics)),
Expand Down Expand Up @@ -789,6 +793,14 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

// Call pollInstance to handle instance creation/deletion for objects without an instance schedule
if !r.hasInstanceSchedule {
_, err = r.pollInstance(curMat, perfToJSON(perfRecords), apiD)
if err != nil {
return nil, err
}
}

for _, perfRecord := range perfRecords {
pr := perfRecord.Records
t := perfRecord.Timestamp
Expand Down Expand Up @@ -1299,6 +1311,18 @@ func (r *RestPerf) pollData(startTime time.Time, perfRecords []rest.PerfRecord)
return newDataMap, nil
}

func perfToJSON(records []rest.PerfRecord) iter.Seq[gjson.Result] {
return func(yield func(gjson.Result) bool) {
for _, record := range records {
if record.Records.IsArray() {
record.Records.ForEach(func(_, r gjson.Result) bool {
return yield(r)
})
}
}
}
}

// Poll counter "ops" of the related/parent object, required for objects
// workload_detail and workload_detail_volume. This counter is already
// collected by the other collectors, so this poll is redundant
Expand Down Expand Up @@ -1471,17 +1495,17 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) {
return r.handleError(err, href)
}

return r.pollInstance(records, time.Since(apiT))
return r.pollInstance(r.Matrix[r.Object], slices.Values(records), time.Since(apiT))
}

func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map[string]*matrix.Matrix, error) {
func (r *RestPerf) pollInstance(mat *matrix.Matrix, records iter.Seq[gjson.Result], apiD time.Duration) (map[string]*matrix.Matrix, error) {
var (
err error
oldInstances *set.Set
oldSize, newSize, removed, added int
count int
)

mat := r.Matrix[r.Object]
oldInstances = set.New()
parseT := time.Now()
for key := range mat.GetInstances() {
Expand All @@ -1497,15 +1521,13 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map
instanceKeys = []string{"name"}
}

if len(records) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

for _, instanceData := range records {
for instanceData := range records {
var (
instanceKey string
)

count++

if !instanceData.IsObject() {
r.Logger.Warn("Instance data is not object, skipping", slog.String("type", instanceData.Type.String()))
continue
Expand Down Expand Up @@ -1550,6 +1572,10 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map
}
}

if count == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

for key := range oldInstances.Iter() {
mat.RemoveInstance(key)
r.Logger.Debug("removed instance", slog.String("key", key))
Expand All @@ -1569,7 +1595,7 @@ func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map
_ = r.Metadata.LazySetValueUint64("numCalls", "instance", r.Client.Metadata.NumCalls)

if newSize == 0 {
return nil, errs.New(errs.ErrNoInstance, "")
return nil, errs.New(errs.ErrNoInstance, "no "+r.Object+" instances on cluster")
}

return nil, err
Expand Down Expand Up @@ -1603,6 +1629,19 @@ func (r *RestPerf) handleError(err error, href string) (map[string]*matrix.Matri
return nil, fmt.Errorf("failed to fetch data. href=[%s] err: %w", href, err)
}

func (r *RestPerf) InitSchedule() {
if r.Schedule == nil {
return
}
tasks := r.Schedule.GetTasks()
for _, task := range tasks {
if task.Name == "instance" {
r.hasInstanceSchedule = true
return
}
}
}

func isWorkloadObject(query string) bool {
_, ok := qosQueries[query]
return ok
Expand Down
12 changes: 7 additions & 5 deletions cmd/collectors/restperf/restperf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func TestMain(m *testing.M) {
propertiesData = jsonToPerfRecords("testdata/volume-poll-properties.json.gz")
fullPollData = jsonToPerfRecords("testdata/volume-poll-full.json.gz")
fullPollData[0].Timestamp = now.UnixNano()
_, _ = benchPerf.pollInstance(propertiesData[0].Records.Array(), 0)
mat := matrix.New("Volume", "Volume", "Volume")
_, _ = benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0)
_, _ = benchPerf.pollData(now, fullPollData)

os.Exit(m.Run())
Expand All @@ -98,7 +99,8 @@ func BenchmarkRestPerf_PollData(b *testing.B) {
for range b.N {
now = now.Add(time.Minute * 15)
fullPollData[0].Timestamp = now.UnixNano()
mi, _ := benchPerf.pollInstance(propertiesData[0].Records.Array(), 0)
mat := matrix.New("Volume", "Volume", "Volume")
mi, _ := benchPerf.pollInstance(mat, perfToJSON(propertiesData), 0)
for _, mm := range mi {
ms = append(ms, mm)
}
Expand Down Expand Up @@ -166,7 +168,7 @@ func TestRestPerf_pollData(t *testing.T) {
}
pollInstance := jsonToPerfRecords(tt.pollInstance)
pollData := jsonToPerfRecords(tt.pollDataPath1)
_, err = r.pollInstance(pollInstance[0].Records.Array(), 0)
_, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInstance), 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -295,7 +297,7 @@ func TestPartialAggregationSequence(t *testing.T) {
t.Fatalf("Failed to fetch poll counter %v", err)
}
pollInstance := jsonToPerfRecords("testdata/partialAggregation/qos-poll-instance.json")
_, err = r.pollInstance(pollInstance[0].Records.Array(), 0)
_, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInstance), 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -424,7 +426,7 @@ func TestQosVolume(t *testing.T) {
}

pollInst := jsonToPerfRecords(tt.pollInstance)
_, err = r.pollInstance(pollInst[0].Records.Array(), 0)
_, err = r.pollInstance(r.Matrix[r.Object], perfToJSON(pollInst), 0)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion integration/test/grafana/grafana_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (g *Mgr) Import() (bool, string) {
if docker.IsDockerBasedPoller() {
grafanaURL = "grafana:3000"
}
importCmds := []string{"grafana", "import", "--overwrite", "--addr", grafanaURL}
importCmds := []string{"grafana", "import", "--addr", grafanaURL}
if docker.IsDockerBasedPoller() {
params := []string{"exec", containerIDs[0].ID, "bin/harvest"}
params = append(params, importCmds...)
Expand Down

0 comments on commit 99ae8bf

Please sign in to comment.