Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(outputs.azure_monitor): Prevent infinite send loop for outdated metrics #16448

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions plugins/outputs/azure_monitor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,15 @@ them to the service on every flush interval.
> [!IMPORTANT]
> The Azure Monitor custom metrics service is currently in preview and might
> not be available in all Azure regions.
> Please also take the [metric time limitations](#metric-time-limitations) into
> account!

The metrics from each input plugin will be written to a separate Azure Monitor
namespace, prefixed with `Telegraf/` by default. The field name for each metric
is written as the Azure Monitor metric name. All field values are written as a
summarized set that includes: min, max, sum, count. Tags are written as a
dimension on each Azure Monitor metric.

> [!NOTE]
> Azure Monitor won't accept metrics that are too far in the past or future.
> Keep this in mind when configuring your output buffer limits or other
> variables, such as flush intervals, or when using input sources that could
> cause metrics to be out of this allowed range.
> Currently, the timestamp should not be older than 30 minutes or more than
> 4 minutes in the future at the time when it is sent to Azure Monitor service.

⭐ Telegraf v1.8.0
🏷️ cloud, datastore
💻 all
Expand Down Expand Up @@ -70,6 +64,14 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## cloud environment, set the appropriate REST endpoint for receiving
## metrics. (Note: region may be unused in this context)
# endpoint_url = "https://monitoring.core.usgovcloudapi.net"

## Time limitations of metric to send
## Documentation can be found here:
## https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp
## However, the returned (400) error message might document more strict or
## relaxed settings. By default, only past metrics witin the limit are sent.
# timestamp_limit_past = "30m"
# timestamp_limit_future = "-1m"
```

## Setup
Expand Down Expand Up @@ -175,3 +177,30 @@ modifiers][conf-modifiers] to limit the string-typed fields that are sent to
the plugin.

[conf-modifiers]: ../../../docs/CONFIGURATION.md#modifiers

## Metric time limitations

Azure Monitor won't accept metrics too far in the past or future. Keep this in
mind when configuring your output buffer limits or other variables, such as
flush intervals, or when using input sources that could cause metrics to be
out of this allowed range.

According to the [documentation][timestamp_docs], the timestamp should not be
older than 20 minutes or more than 5 minutes in the future at the time when the
metric is sent to the Azure Monitor service. However, HTTP `400` error messages
returned by the service might specify other values such as 30 minutes in the
past and 4 minutes in the future.

You can control the timeframe actually sent using the `timestamp_limit_past` and
`timestamp_limit_future` settings. By default only metrics between 30 minutes
and up to one minute in the past are sent. The lower limit represents the more
permissive limit received in the `400` error messages. The upper limit leaves
enough time for aggregation to happen by not sending aggregations too early.

> [!IMPORTANT]
> When adapting the limit you need to take the limits permitted by the service
> as well as latency when sending metrics into account. Furthermore, you sould
> not send metrics too early as in this case aggregation might not happen and
> values are misleading.

[timestamp_docs]: https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/metrics-store-custom-rest-api?tabs=rest#timestamp
129 changes: 91 additions & 38 deletions plugins/outputs/azure_monitor/azure_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ type aggregate struct {
}

type AzureMonitor struct {
Timeout config.Duration `toml:"timeout"`
NamespacePrefix string `toml:"namespace_prefix"`
StringsAsDimensions bool `toml:"strings_as_dimensions"`
Region string `toml:"region"`
ResourceID string `toml:"resource_id"`
EndpointURL string `toml:"endpoint_url"`
Log telegraf.Logger `toml:"-"`
Timeout config.Duration `toml:"timeout"`
NamespacePrefix string `toml:"namespace_prefix"`
StringsAsDimensions bool `toml:"strings_as_dimensions"`
Region string `toml:"region"`
ResourceID string `toml:"resource_id"`
EndpointURL string `toml:"endpoint_url"`
TimestampLimitPast config.Duration `toml:"timestamp_limit_past"`
TimestampLimitFuture config.Duration `toml:"timestamp_limit_future"`
Log telegraf.Logger `toml:"-"`

url string
preparer autorest.Preparer
Expand Down Expand Up @@ -91,6 +93,13 @@ func (a *AzureMonitor) Init() error {
}

func (a *AzureMonitor) Connect() error {
a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(a.Timeout),
}

// If information is missing try to retrieve it from the Azure VM instance
if a.Region == "" || a.ResourceID == "" {
region, resourceID, err := vmInstanceMetadata(a.client)
Expand Down Expand Up @@ -119,6 +128,7 @@ func (a *AzureMonitor) Connect() error {
} else {
a.url = a.EndpointURL + a.ResourceID + "/metrics"
}
a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)

a.MetricOutsideWindow = selfstat.Register(
"azure_monitor",
Expand All @@ -129,15 +139,6 @@ func (a *AzureMonitor) Connect() error {
},
)

a.Log.Debugf("Writing to Azure Monitor URL: %s", a.url)

a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: time.Duration(a.Timeout),
}

a.Reset()

return nil
Expand All @@ -155,7 +156,7 @@ func (a *AzureMonitor) Add(m telegraf.Metric) {
// Azure Monitor only supports aggregates 30 minutes into the past and 4
// minutes into the future. Future metrics are dropped when pushed.
tbucket := m.Time().Truncate(time.Minute)
if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) {
if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) {
a.MetricOutsideWindow.Incr(1)
return
}
Expand Down Expand Up @@ -226,7 +227,7 @@ func (a *AzureMonitor) Push() []telegraf.Metric {
var metrics []telegraf.Metric
for tbucket, aggs := range a.cache {
// Do not send metrics early
if tbucket.After(a.timeFunc().Add(-time.Minute)) {
if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) {
continue
}
for _, agg := range aggs {
Expand Down Expand Up @@ -261,13 +262,13 @@ func (a *AzureMonitor) Push() []telegraf.Metric {
func (a *AzureMonitor) Reset() {
for tbucket := range a.cache {
// Remove aggregates older than 30 minutes
if tbucket.Before(a.timeFunc().Add(-30 * time.Minute)) {
if tbucket.Before(a.timeFunc().Add(-time.Duration(a.TimestampLimitPast))) {
delete(a.cache, tbucket)
continue
}
// Metrics updated within the latest 1m have not been pushed and should
// not be cleared.
if tbucket.After(a.timeFunc().Add(-1 * time.Minute)) {
if tbucket.After(a.timeFunc().Add(time.Duration(a.TimestampLimitFuture))) {
continue
}
for id := range a.cache[tbucket] {
Expand All @@ -278,45 +279,80 @@ func (a *AzureMonitor) Reset() {

// Write writes metrics to the remote endpoint
func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
now := a.timeFunc()
tsEarliest := now.Add(-time.Duration(a.TimestampLimitPast))
tsLatest := now.Add(time.Duration(a.TimestampLimitFuture))

writeErr := &internal.PartialWriteError{
MetricsAccept: make([]int, 0, len(metrics)),
}
azmetrics := make(map[uint64]*azureMonitorMetric, len(metrics))
for _, m := range metrics {
for i, m := range metrics {
// Skip metrics that our outside of the valid timespan
if m.Time().Before(tsEarliest) || m.Time().After(tsLatest) {
a.Log.Tracef("Metric outside acceptable time window: %v", m)
a.MetricOutsideWindow.Incr(1)
writeErr.Err = errors.New("metric(s) outside of acceptable time window")
writeErr.MetricsReject = append(writeErr.MetricsReject, i)
continue
}

amm, err := translate(m, a.NamespacePrefix)
if err != nil {
a.Log.Errorf("Could not create azure metric for %q; discarding point", m.Name())
if writeErr.Err == nil {
writeErr.Err = errors.New("translating metric(s) failed")
}
writeErr.MetricsReject = append(writeErr.MetricsReject, i)
continue
}

id := hashIDWithTagKeysOnly(m)
if azm, ok := azmetrics[id]; !ok {
azmetrics[id] = amm
azmetrics[id].index = i
} else {
azmetrics[id].Data.BaseData.Series = append(
azm.Data.BaseData.Series,
amm.Data.BaseData.Series...,
)
azmetrics[id].index = i
}
}

if len(azmetrics) == 0 {
return nil
if writeErr.Err == nil {
return nil
}
return writeErr
}

var buffer bytes.Buffer
buffer.Grow(maxRequestBodySize)
batchIndices := make([]int, 0, len(azmetrics))
for _, m := range azmetrics {
// Azure Monitor accepts new batches of points in new-line delimited
// JSON, following RFC 4288 (see https://github.com/ndjson/ndjson-spec).
buf, err := json.Marshal(m)
if err != nil {
a.Log.Errorf("Could not marshall metric to JSON: %v", err)
writeErr.MetricsReject = append(writeErr.MetricsReject, m.index)
writeErr.Err = err
continue
}
batchIndices = append(batchIndices, m.index)

// Azure Monitor's maximum request body size of 4MB. Send batches that
// exceed this size via separate write requests.
if buffer.Len()+len(buf)+1 > maxRequestBodySize {
if err := a.send(buffer.Bytes()); err != nil {
return err
if retryable, err := a.send(buffer.Bytes()); err != nil {
writeErr.Err = err
if !retryable {
writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...)
}
return writeErr
}
writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...)
batchIndices = make([]int, 0, len(azmetrics))
buffer.Reset()
}
if _, err := buffer.Write(buf); err != nil {
Expand All @@ -327,22 +363,35 @@ func (a *AzureMonitor) Write(metrics []telegraf.Metric) error {
}
}

return a.send(buffer.Bytes())
if retryable, err := a.send(buffer.Bytes()); err != nil {
writeErr.Err = err
if !retryable {
writeErr.MetricsReject = append(writeErr.MetricsAccept, batchIndices...)
}
return writeErr
}
writeErr.MetricsAccept = append(writeErr.MetricsAccept, batchIndices...)

if writeErr.Err == nil {
return nil
}

return writeErr
}

func (a *AzureMonitor) send(body []byte) error {
func (a *AzureMonitor) send(body []byte) (bool, error) {
var buf bytes.Buffer
g := gzip.NewWriter(&buf)
if _, err := g.Write(body); err != nil {
return fmt.Errorf("zipping content failed: %w", err)
return false, fmt.Errorf("zipping content failed: %w", err)
}
if err := g.Close(); err != nil {
return err
return false, fmt.Errorf("closing gzip writer failed: %w", err)
}

req, err := http.NewRequest("POST", a.url, &buf)
if err != nil {
return fmt.Errorf("creating request failed: %w", err)
return false, fmt.Errorf("creating request failed: %w", err)
}

req.Header.Set("Content-Encoding", "gzip")
Expand All @@ -352,7 +401,7 @@ func (a *AzureMonitor) send(body []byte) error {
// refresh the token if needed.
req, err = a.preparer.Prepare(req)
if err != nil {
return fmt.Errorf("unable to fetch authentication credentials: %w", err)
return false, fmt.Errorf("unable to fetch authentication credentials: %w", err)
}

resp, err := a.client.Do(req)
Expand All @@ -366,19 +415,20 @@ func (a *AzureMonitor) send(body []byte) error {
Timeout: time.Duration(a.Timeout),
}
}
return err
return true, err
}
defer resp.Body.Close()

if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
return nil
return false, nil
}

retryable := resp.StatusCode != 400
if respbody, err := io.ReadAll(resp.Body); err == nil {
return fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody))
return retryable, fmt.Errorf("failed to write batch: [%d] %s: %s", resp.StatusCode, resp.Status, string(respbody))
}

return fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status)
return retryable, fmt.Errorf("failed to write batch: [%d] %s", resp.StatusCode, resp.Status)
}

// vmMetadata retrieves metadata about the current Azure VM
Expand Down Expand Up @@ -533,12 +583,15 @@ func getIntField(m telegraf.Metric, key string) (int64, error) {
}
return 0, fmt.Errorf("unexpected type: %s: %T", key, fv)
}

func init() {
outputs.Add("azure_monitor", func() telegraf.Output {
return &AzureMonitor{
NamespacePrefix: "Telegraf/",
Timeout: config.Duration(5 * time.Second),
timeFunc: time.Now,
NamespacePrefix: "Telegraf/",
TimestampLimitPast: config.Duration(20 * time.Minute),
TimestampLimitFuture: config.Duration(-1 * time.Minute),
Timeout: config.Duration(5 * time.Second),
timeFunc: time.Now,
}
})
}
Loading
Loading