Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle batching in shelf plugin #1429

Merged
merged 14 commits into from
Nov 14, 2022
126 changes: 91 additions & 35 deletions cmd/collectors/zapi/plugins/shelf/shelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ import (
"github.com/netapp/harvest/v2/pkg/matrix"
"github.com/netapp/harvest/v2/pkg/tree/node"
"github.com/netapp/harvest/v2/pkg/util"
"strconv"
"strings"
"time"
)

const BatchSize = "500"

type Shelf struct {
*plugin.AbstractPlugin
data map[string]*matrix.Matrix
instanceKeys map[string]string
instanceLabels map[string]*dict.Dict
batchSize string
client *zapi.Client
query string
}
Expand Down Expand Up @@ -139,51 +144,100 @@ func (my *Shelf) Init() error {
}

my.Logger.Debug().Msgf("initialized with data [%d] objects", len(my.data))

// setup batchSize for request
if my.client.IsClustered() {
if b := my.Params.GetChildContentS("batch_size"); b != "" {
if _, err := strconv.Atoi(b); err == nil {
my.batchSize = b
}
} else {
my.batchSize = BatchSize
}
}
return nil
}

func (my *Shelf) Run(data *matrix.Matrix) ([]*matrix.Matrix, error) {

var (
result *node.Node
err error
result *node.Node
ad, pd time.Duration // Request/API time, Parse time, Fetch time
output []*matrix.Matrix
shelves int
numMetrics int
err error
)

apiT := 0 * time.Second
parseT := 0 * time.Second

if !my.client.IsClustered() {
for _, instance := range data.GetInstances() {
instance.SetLabel("shelf", instance.GetLabel("shelf_id"))
}
}

if result, err = my.client.InvokeRequestString(my.query); err != nil {
return nil, err
}

// Set all global labels from zapi.go if already not exist
for a := range my.instanceLabels {
my.data[a].SetGlobalLabels(data.GetGlobalLabels())
}

var output []*matrix.Matrix
request := node.NewXMLS(my.query)
tag := "initial"
totalShelves := 0
totalMetrics := 0

if my.client.IsClustered() {
output, err = my.handleCMode(result)
} else {
output, err = my.handle7Mode(result)
}
if err != nil {
return output, err
if my.client.IsClustered() && my.batchSize != "" {
request.NewChildS("max-records", my.batchSize)
}

if my.client.IsClustered() {
return my.calculateEnvironmentMetrics(output, data)
} else {
return output, nil
for {
if result, tag, ad, pd, err = my.client.InvokeBatchWithTimers(request, tag); err != nil {
return nil, err
}

if result == nil {
break
}

apiT += ad
parseT += pd

var batchOutput []*matrix.Matrix

if my.client.IsClustered() {
batchOutput, shelves, numMetrics, err = my.handleCMode(result)
} else {
batchOutput, shelves, numMetrics, err = my.handle7Mode(result)
}

if err != nil {
return output, err
}

if my.client.IsClustered() {
if err = my.calculateEnvironmentMetrics(data); err != nil {
return output, err
}
}
totalShelves += shelves
totalMetrics += numMetrics
output = append(output, batchOutput...)
}

my.Logger.Info().
Int("numShelves", totalShelves).
Int("metrics", totalMetrics).
Str("apiD", apiT.Round(time.Millisecond).String()).
Str("parseD", parseT.Round(time.Millisecond).String()).
Str("batchSize", my.batchSize).
Msg("Collected")
return output, nil

}

func (my *Shelf) calculateEnvironmentMetrics(output []*matrix.Matrix, data *matrix.Matrix) ([]*matrix.Matrix, error) {
func (my *Shelf) calculateEnvironmentMetrics(data *matrix.Matrix) error {
var err error
shelfEnvironmentMetricMap := make(map[string]*shelfEnvironmentMetric, 0)
for _, o := range my.data {
Expand Down Expand Up @@ -326,25 +380,25 @@ func (my *Shelf) calculateEnvironmentMetrics(output []*matrix.Matrix, data *matr
}
}
}
return output, nil
return nil
}

func (my *Shelf) handleCMode(result *node.Node) ([]*matrix.Matrix, error) {
func (my *Shelf) handleCMode(result *node.Node) ([]*matrix.Matrix, int, int, error) {
var (
shelves []*node.Node
shelves []*node.Node
numMetrics int
output []*matrix.Matrix
)

if x := result.GetChildS("attributes-list"); x != nil {
shelves = x.GetChildren()
}
if len(shelves) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no shelf instances found")
return nil, 0, 0, errs.New(errs.ErrNoInstance, "no shelf instances found")
}

my.Logger.Debug().Msgf("fetching %d shelf counters", len(shelves))

var output []*matrix.Matrix

// Purge and reset data
for _, data1 := range my.data {
data1.PurgeInstances()
Expand Down Expand Up @@ -386,7 +440,7 @@ func (my *Shelf) handleCMode(result *node.Node) ([]*matrix.Matrix, error) {

if err != nil {
my.Logger.Error().Err(err).Str("attribute", attribute).Msg("Failed to add instance")
return nil, err
return nil, 0, 0, err
}
my.Logger.Debug().Msgf("add (%s) instance: %s.%s", attribute, shelfID, key)

Expand All @@ -413,6 +467,7 @@ func (my *Shelf) handleCMode(result *node.Node) ([]*matrix.Matrix, error) {
if err := m.SetValueString(instance, value); err != nil {
my.Logger.Debug().Msgf("(%s) failed to parse value (%s): %v", metricKey, value, err)
} else {
numMetrics++
my.Logger.Debug().Msgf("(%s) added value (%s)", metricKey, value)
}
}
Expand All @@ -428,23 +483,23 @@ func (my *Shelf) handleCMode(result *node.Node) ([]*matrix.Matrix, error) {
}
}

return output, nil
return output, len(shelves), numMetrics, nil
}

func (my *Shelf) handle7Mode(result *node.Node) ([]*matrix.Matrix, error) {
func (my *Shelf) handle7Mode(result *node.Node) ([]*matrix.Matrix, int, int, error) {
var (
shelves []*node.Node
channels []*node.Node
shelves []*node.Node
channels []*node.Node
output []*matrix.Matrix
numMetrics int
)
//fallback to 7mode
channels = result.SearchChildren([]string{"shelf-environ-channel-info"})

if len(channels) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no channels found")
return nil, 0, 0, errs.New(errs.ErrNoInstance, "no channels found")
}

var output []*matrix.Matrix

// Purge and reset data
for _, data1 := range my.data {
data1.PurgeInstances()
Expand Down Expand Up @@ -490,7 +545,7 @@ func (my *Shelf) handle7Mode(result *node.Node) ([]*matrix.Matrix, error) {

if err != nil {
my.Logger.Error().Msgf("add (%s) instance: %v", attribute, err)
return nil, err
return nil, 0, 0, err
}
my.Logger.Debug().Msgf("add (%s) instance: %s.%s", attribute, shelfID, key)

Expand Down Expand Up @@ -519,6 +574,7 @@ func (my *Shelf) handle7Mode(result *node.Node) ([]*matrix.Matrix, error) {
if err := m.SetValueString(instance, value); err != nil {
my.Logger.Debug().Msgf("(%s) failed to parse value (%s): %v", metricKey, value, err)
} else {
numMetrics++
my.Logger.Debug().Msgf("(%s) added value (%s)", metricKey, value)
}
}
Expand All @@ -534,5 +590,5 @@ func (my *Shelf) handle7Mode(result *node.Node) ([]*matrix.Matrix, error) {
}
}
}
return output, nil
return output, len(shelves), numMetrics, nil
}