Skip to content

Commit

Permalink
Fix: add type to failure metrics
Browse files Browse the repository at this point in the history
sm_agent_push_failed_total has a type label that indicates the type of
failure, not the type of payload that is being pushed. Rename that label
to `reason` and add a `type` label that has values `metrics` and `logs`.

Signed-off-by: Marcelo E. Magallon <[email protected]>
  • Loading branch information
mem committed Sep 13, 2023
1 parent 172ea47 commit b62caa2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
5 changes: 3 additions & 2 deletions internal/pusher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Metrics struct {
var (
labelsWithType = []string{"regionID", "tenantID", "type"}
labelsWithTypeStatus = []string{"regionID", "tenantID", "type", "status"}
labelsWithTypeReason = []string{"regionID", "tenantID", "type", "reason"}
)

// NewMetrics returns a new set of publisher metrics registered in the given registerer.
Expand Down Expand Up @@ -63,7 +64,7 @@ func NewMetrics(promRegisterer prometheus.Registerer) (m Metrics) {
Name: "push_failed_total",
Help: "Total number of push failures by type.",
},
labelsWithType)
labelsWithTypeReason)

promRegisterer.MustRegister(m.FailedCounter)

Expand Down Expand Up @@ -142,7 +143,7 @@ func (m Metrics) WithType(t string) Metrics {
PushCounter: m.PushCounter.MustCurryWith(typeLabels),
ErrorCounter: m.ErrorCounter.MustCurryWith(typeLabels),
BytesOut: m.BytesOut.MustCurryWith(typeLabels),
FailedCounter: m.FailedCounter, // type in failed counter servers a different purpose.
FailedCounter: m.FailedCounter.MustCurryWith(typeLabels),
RetriesCounter: m.RetriesCounter.MustCurryWith(typeLabels),
DroppedCounter: m.DroppedCounter.MustCurryWith(typeLabels),
ResponseCounter: m.ResponseCounter.MustCurryWith(typeLabels),
Expand Down
33 changes: 25 additions & 8 deletions internal/pusher/v1/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,24 @@ func (p *publisherImpl) publish(ctx context.Context, payload pusher.Payload) {
logger = p.logger.With().Int("region", regionID).Int64("tenant", localID).Logger()
)

streams := payload.Streams()
metrics := payload.Metrics()

for retry := 2; retry > 0; retry-- {
client, err := p.getClient(ctx, tenantID, newClient)
if err != nil {
logger.Error().Err(err).Msg("get client failed")
p.metrics.FailedCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueClient).Inc()
if len(streams) > 0 {
p.metrics.FailedCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueLogs, pusher.LabelValueClient).Inc()
}
if len(metrics) > 0 {
p.metrics.FailedCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueMetrics, pusher.LabelValueClient).Inc()
}
return
}

if len(payload.Streams()) > 0 {
if n, err := p.pushEvents(ctx, client.Events, payload.Streams()); err != nil {
if len(streams) > 0 {
if n, err := p.pushEvents(ctx, client.Events, streams); err != nil {
httpStatusCode, hasStatusCode := prom.GetHttpStatusCode(err)
logger.Error().Err(err).Int("status", httpStatusCode).Msg("publish events")
p.metrics.ErrorCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueLogs, strconv.Itoa(httpStatusCode)).Inc()
Expand All @@ -98,11 +106,12 @@ func (p *publisherImpl) publish(ctx context.Context, payload pusher.Payload) {
} else {
p.metrics.PushCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueLogs).Inc()
p.metrics.BytesOut.WithLabelValues(regionStr, tenantStr, pusher.LabelValueLogs).Add(float64(n))
streams = nil
}
}

if len(payload.Metrics()) > 0 {
if n, err := p.pushMetrics(ctx, client.Metrics, payload.Metrics()); err != nil {
if len(metrics) > 0 {
if n, err := p.pushMetrics(ctx, client.Metrics, metrics); err != nil {
httpStatusCode, hasStatusCode := prom.GetHttpStatusCode(err)
logger.Error().Err(err).Int("status", httpStatusCode).Msg("publish metrics")
p.metrics.ErrorCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueMetrics, strconv.Itoa(httpStatusCode)).Inc()
Expand All @@ -114,15 +123,23 @@ func (p *publisherImpl) publish(ctx context.Context, payload pusher.Payload) {
} else {
p.metrics.PushCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueMetrics).Inc()
p.metrics.BytesOut.WithLabelValues(regionStr, tenantStr, pusher.LabelValueMetrics).Add(float64(n))
metrics = nil
}
}

// if we make it here we have sent everything we could send, we are done.
return
if len(streams) == 0 && len(metrics) == 0 {
// if we make it here we have sent everything we could send, we are done.
return
}
}

// if we are here, we retried and failed
p.metrics.FailedCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueRetryExhausted).Inc()
if len(streams) > 0 {
p.metrics.FailedCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueLogs, pusher.LabelValueRetryExhausted).Inc()
}
if len(metrics) > 0 {
p.metrics.FailedCounter.WithLabelValues(regionStr, tenantStr, pusher.LabelValueMetrics, pusher.LabelValueRetryExhausted).Inc()
}
logger.Warn().Msg("failed to push payload")
}

Expand Down

0 comments on commit b62caa2

Please sign in to comment.