Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial instrumentation to Azure #767

Merged
merged 4 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [FEATURE] Added the ability to hedge requests with all backends [#750](https://github.com/grafana/tempo/pull/750) (@joe-elliott)
* [ENHANCEMENT] Performance: improve compaction speed with concurrent reads and writes [#754](https://github.com/grafana/tempo/pull/754) (@mdisibio)
* [ENHANCEMENT] Improve readability of cpu and memory metrics on operational dashboard [#764](https://github.com/grafana/tempo/pull/764) (@bboreham)
* [ENHANCEMENT] Add `azure_request_duration_seconds` metric. [#767](https://github.com/grafana/tempo/pull/767) (@JosephWoodward)

## v1.0.1

Expand Down
41 changes: 25 additions & 16 deletions tempodb/backend/azure/azure_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package azure
import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"time"
Expand All @@ -17,8 +18,8 @@ const (
uptoHedgedRequests = 2
)

func GetContainerURL(ctx context.Context, conf *Config, hedge bool) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(conf.StorageAccountName.String(), conf.StorageAccountKey.String())
func GetContainerURL(ctx context.Context, cfg *Config, hedge bool) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(cfg.StorageAccountName.String(), cfg.StorageAccountKey.String())
if err != nil {
return blob.ContainerURL{}, err
}
Expand All @@ -31,32 +32,40 @@ func GetContainerURL(ctx context.Context, conf *Config, hedge bool) (blob.Contai
retryOptions.TryTimeout = time.Until(deadline)
}

var httpSender pipeline.Factory
if hedge && conf.HedgeRequestsAt != 0 {
httpSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
client := hedgedhttp.NewClient(conf.HedgeRequestsAt, uptoHedgedRequests, nil)
customTransport := http.DefaultTransport.(*http.Transport).Clone()

// Send the request over the network
resp, err := client.Do(request.WithContext(ctx))
// add instrumentation
transport := newInstrumentedTransport(customTransport)

return pipeline.NewHTTPResponse(resp), err
}
})
// hedge if desired (0 means disabled)
if hedge && cfg.HedgeRequestsAt != 0 {
transport = hedgedhttp.NewRoundTripper(cfg.HedgeRequestsAt, uptoHedgedRequests, transport)
}

client := http.Client{Transport: transport}

httpSender := pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {

// Send the request over the network
resp, err := client.Do(request.WithContext(ctx))

return pipeline.NewHTTPResponse(resp), err
}
})

p := blob.NewPipeline(c, blob.PipelineOptions{
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Tempo"},
HTTPSender: httpSender,
})

u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
u, err := url.Parse(fmt.Sprintf("https://%s.%s", cfg.StorageAccountName, cfg.Endpoint))

// If the endpoint doesn't start with blob.core we can assume Azurite is being used
// So the endpoint should follow Azurite URL style
if !strings.HasPrefix(conf.Endpoint, "blob.core") {
u, err = url.Parse(fmt.Sprintf("http://%s/%s", conf.Endpoint, conf.StorageAccountName))
if !strings.HasPrefix(cfg.Endpoint, "blob.core") {
u, err = url.Parse(fmt.Sprintf("http://%s/%s", cfg.Endpoint, cfg.StorageAccountName))
}

if err != nil {
Expand All @@ -65,7 +74,7 @@ func GetContainerURL(ctx context.Context, conf *Config, hedge bool) (blob.Contai

service := blob.NewServiceURL(*u, p)

return service.NewContainerURL(conf.ContainerName), nil
return service.NewContainerURL(cfg.ContainerName), nil
}

func GetContainer(ctx context.Context, conf *Config, hedge bool) (blob.ContainerURL, error) {
Expand Down
41 changes: 41 additions & 0 deletions tempodb/backend/azure/instrumentation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package azure

import (
"net/http"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
azureRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempodb",
Name: "azure_request_duration_seconds",
Help: "Time spent doing Azure requests.",

Buckets: prometheus.ExponentialBuckets(0.005, 4, 6),
}, []string{"operation", "status_code"})
)

type instrumentedTransport struct {
observer prometheus.ObserverVec
next http.RoundTripper
}

func newInstrumentedTransport(next http.RoundTripper) http.RoundTripper {
return instrumentedTransport{
observer: azureRequestDuration,
next: next,
}
}

func (i instrumentedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := i.next.RoundTrip(req)
if err == nil {
i.observer.WithLabelValues(req.Method, strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds())
}
return resp, err
}